FastMqttClient.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. using Edge.Core.IndustryStandardInterface.NetworkController;
  2. using Microsoft.Extensions.Logging;
  3. using MQTTnet;
  4. using MQTTnet.Client;
  5. using MQTTnet.Diagnostics;
  6. using MQTTnet.Protocol;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Text;
  11. using System.Threading.Tasks;
  12. namespace DeviceInfoToAliIotHubViaGateway
  13. {
  14. class FastMqttClient : IMqttClientNetworkController
  15. {
  16. private IMqttClient mqttClient;
  17. public string Name => this.GetType().FullName;
  18. public event EventHandler<OnMqttMessageReceivedEventArg> OnMessageReceived;
  19. public event EventHandler<NetworkControllerOnStateChangeEventArg> OnNetworkControllerStateChange;
  20. private ILogger logger = null;
  21. public FastMqttClient(ILogger logger)
  22. {
  23. this.logger = logger;
  24. // Write all trace messages to the console window.
  25. //MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  26. //{
  27. // //var trace = $">> [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
  28. // var trace = $"{e.TraceMessage.Message}";
  29. // if (e.TraceMessage.Exception != null)
  30. // trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
  31. // this.logger.LogTrace(trace);
  32. //};
  33. // Create a new MQTT client.
  34. var factory = new MqttFactory();
  35. this.mqttClient = factory.CreateMqttClient();
  36. this.mqttClient.ApplicationMessageReceivedAsync += e =>
  37. {
  38. //Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
  39. //Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
  40. //Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  41. //Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
  42. //Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
  43. //Console.WriteLine();
  44. this.logger.LogDebug($"======>>>FastMqttClient received at topic: {e.ApplicationMessage.Topic} with payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  45. var safe = this.OnMessageReceived;
  46. safe?.Invoke(this, new OnMqttMessageReceivedEventArg(new MqttMessage()
  47. {
  48. Message = e.ApplicationMessage.Payload,
  49. Topic = e.ApplicationMessage.Topic,
  50. }));
  51. return Task.CompletedTask;
  52. };
  53. }
  54. public async Task<bool> CloseAsync()
  55. {
  56. this.logger.LogInformation($"Closing FastMqttClient");
  57. return true;
  58. }
  59. public async Task<bool> ConfigNetworkAsync(params string[] parameters)
  60. {
  61. throw new NotImplementedException();
  62. }
  63. public async Task<bool> ConnectAsync(string clientId, string userName, string password)
  64. {
  65. // Create TCP based options using the builder.
  66. var options = new MqttClientOptionsBuilder()
  67. .WithClientId(clientId)
  68. .WithTcpServer(this.mqttServerUrl, this.mqttServerPort)
  69. .WithCredentials(userName, password).WithTls()
  70. // aliyun require a high value on this period, like 2m, otherwise, rejected
  71. .WithKeepAlivePeriod(new TimeSpan(0, 2, 0))
  72. .WithCleanSession(true)
  73. .Build();
  74. this.logger.LogInformation($"Connecting to mqtt server with url: {this.mqttServerUrl + "-" + this.mqttServerPort}, clientId: {clientId}, userName: {userName}, pwd: {password}");
  75. var result = await this.mqttClient.ConnectAsync(options);
  76. this.logger.LogInformation($" Connect result is: {result.ResultCode}");
  77. if (result.ResultCode == MqttClientConnectResultCode.Success)
  78. return true;
  79. else
  80. return false;
  81. }
  82. public async Task<bool> DisconnectAsync()
  83. {
  84. this.logger.LogInformation($"Disconnecting FastMqttClient");
  85. await this.mqttClient.DisconnectAsync();
  86. return true;
  87. }
  88. private string mqttServerUrl;
  89. private int mqttServerPort;
  90. public async Task<bool> OpenAsync(string mqttServerUrl, int port)
  91. {
  92. this.mqttServerUrl = mqttServerUrl;
  93. this.mqttServerPort = port;
  94. return true;
  95. }
  96. public async Task<bool> PublishAsync(int msgId, string topic, byte[] byteMsg, Mqtt_QosLevel qosLevel, bool retain)
  97. {
  98. MqttClientPublishResult result = null;
  99. MqttApplicationMessage mqttMsg = new MqttApplicationMessage();
  100. mqttMsg.Payload = byteMsg;
  101. mqttMsg.Retain = retain;
  102. mqttMsg.Topic = topic;
  103. if (qosLevel == Mqtt_QosLevel.AtMostOnce)
  104. mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
  105. else if (qosLevel == Mqtt_QosLevel.AtLeastOnce)
  106. mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce;
  107. else if (qosLevel == Mqtt_QosLevel.ExactlyOnce)
  108. mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce;
  109. result = await this.mqttClient.PublishAsync(mqttMsg);
  110. if (result.ReasonCode == MqttClientPublishReasonCode.Success)
  111. return true;
  112. return false;
  113. }
  114. public async Task<NetworkState> QueryStatusAsync()
  115. {
  116. return NetworkState.NetworkConnected;
  117. }
  118. public async Task<bool> ResetAsync()
  119. {
  120. await this.mqttClient.DisconnectAsync();
  121. return true;
  122. }
  123. public async Task<bool> SubscribeAsync(int msgId, string topic, Mqtt_QosLevel qosLevel)
  124. {
  125. MqttClientSubscribeResult results = null;
  126. if (qosLevel == Mqtt_QosLevel.AtMostOnce)
  127. results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce);
  128. else if (qosLevel == Mqtt_QosLevel.AtLeastOnce)
  129. results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);
  130. else if (qosLevel == Mqtt_QosLevel.ExactlyOnce)
  131. results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.ExactlyOnce);
  132. if (results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS0
  133. || results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS1
  134. || results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS2)
  135. return true;
  136. else
  137. return false;
  138. }
  139. public async Task<bool> UnsubscribeAsync(int msgId, string topic)
  140. {
  141. var result = await this.mqttClient.UnsubscribeAsync(topic);
  142. if (result.Items.First().ResultCode == MqttClientUnsubscribeResultCode.Success)
  143. return true;
  144. else
  145. return false;
  146. }
  147. }
  148. }