Handler.cs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. using Edge.Core.Processor;
  2. using Edge.Core.IndustryStandardInterface.Pump;
  3. using Microsoft.Extensions.DependencyInjection;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Logging.Abstractions;
  6. using MQTTnet;
  7. using MQTTnet.Client;
  8. using SimpleScanner.MessageEntity;
  9. using System;
  10. using System.Linq;
  11. using System.Text;
  12. using System.Threading.Tasks;
  13. namespace SimpleScanner
  14. {
  15. public class Handler : IDeviceHandler<byte[], ScannerMessage>
  16. {
  17. private IMqttClient mqttClient;
  18. private string mqttServerUrl;
  19. private int mqttServerPort;
  20. private string mqttClientId;
  21. private string mqttClientUsername;
  22. private string mqttClientPassword;
  23. private string topic_ScannerRequest;
  24. private string topic_ScannerRequestReply;
  25. private ILogger logger = NullLogger.Instance;
  26. private bool shouldReconnect = true;
  27. public Handler(string mqttServerUrl, int mqttServerPort, string mqttClientId,
  28. string mqttClientUsername, string mqttClientPassword,
  29. string topic_ScannerRequest, string topic_ScannerRequestReply,
  30. IServiceProvider services)
  31. {
  32. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  33. this.logger = loggerFactory.CreateLogger("Application");
  34. this.mqttServerUrl = mqttServerUrl;
  35. this.mqttServerPort = mqttServerPort;
  36. this.mqttClientId = mqttClientId;
  37. this.mqttClientUsername = mqttClientUsername;
  38. this.mqttClientPassword = mqttClientPassword;
  39. this.topic_ScannerRequest = topic_ScannerRequest;
  40. this.topic_ScannerRequestReply = topic_ScannerRequestReply;
  41. }
  42. public async void Init(IContext<byte[], ScannerMessage> context)
  43. {
  44. //MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
  45. //{
  46. // //var trace = $">> [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
  47. // if (this.logger.IsEnabled(LogLevel.Trace))
  48. // {
  49. // var trace = $"MQTTnet - {e.TraceMessage.Message}";
  50. // if (e.TraceMessage.Exception != null)
  51. // trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
  52. // this.logger.LogTrace(trace);
  53. // }
  54. //};
  55. // Create a new MQTT client.
  56. var factory = new MqttFactory();
  57. this.mqttClient = factory.CreateMqttClient();
  58. this.mqttClient.ApplicationMessageReceivedAsync += e =>
  59. {
  60. if (this.logger.IsEnabled(LogLevel.Trace))
  61. {
  62. this.logger.LogTrace("### Received Messages from Topic ###");
  63. this.logger.LogTrace($" Topic = {e.ApplicationMessage.Topic}");
  64. this.logger.LogTrace($" Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  65. this.logger.LogTrace($" QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
  66. this.logger.LogTrace($" Retain = {e.ApplicationMessage.Retain}");
  67. }
  68. if (e.ApplicationMessage.Topic == this.topic_ScannerRequestReply)
  69. {
  70. try
  71. {
  72. // the device could not be capable to receive any data
  73. logger.LogInformation("how mqtt server send data to this device " +
  74. "which is not capable for receive any command, send it anyway: " +
  75. Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
  76. context.Outgoing.Write(new ScannerMessage()
  77. {
  78. RawContent = this.ParseFormatDataToDeviceData(e.ApplicationMessage.Payload)
  79. });
  80. }
  81. catch (Exception eeee)
  82. {
  83. logger.LogInformation("handle mqtt server incoming msg received from topic: " +
  84. (this.topic_ScannerRequestReply ?? "")
  85. + ", exceptioned: " + eeee);
  86. }
  87. }
  88. return Task.CompletedTask;
  89. };
  90. this.mqttClient.DisconnectedAsync += arg =>
  91. {
  92. this.shouldReconnect = true;
  93. this.logger.LogInformation(" Mqtt connection disconnected, " +
  94. "AuthenticateResult: " + (arg.ReasonString ?? "") +
  95. ", ClientWasConnected: " + arg.ClientWasConnected +
  96. ", exception: " + arg.Exception);
  97. return Task.CompletedTask;
  98. };
  99. // Create TCP based options using the builder.
  100. var options = new MqttClientOptionsBuilder()
  101. .WithClientId(this.mqttClientId)
  102. .WithTcpServer(this.mqttServerUrl, this.mqttServerPort)
  103. .WithCredentials(this.mqttClientUsername, this.mqttClientPassword)
  104. .WithKeepAlivePeriod(new TimeSpan(0, 0, 30))
  105. .WithCleanSession(true)
  106. .Build();
  107. await Task.Run(async () =>
  108. {
  109. while (true)
  110. {
  111. if (!this.shouldReconnect) { await Task.Delay(30000); continue; }
  112. this.logger.LogDebug("Connecting to mqtt server...");
  113. try
  114. {
  115. try
  116. {
  117. // always try to close previous conn.
  118. await mqttClient.DisconnectAsync();
  119. }
  120. catch { }
  121. var result = await mqttClient.ConnectAsync(options);
  122. if (result.ResultCode == MqttClientConnectResultCode.Success)
  123. {
  124. this.logger.LogInformation(" Successfully connected to mqtt server.");
  125. if (string.IsNullOrEmpty(this.topic_ScannerRequestReply))
  126. {
  127. this.shouldReconnect = false;
  128. return;
  129. }
  130. var subResult = await this.mqttClient.SubscribeAsync(this.topic_ScannerRequestReply,
  131. MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce);
  132. if (subResult.Items.Any())
  133. {
  134. if ((int)(subResult.Items.First().ResultCode) <= 2)
  135. {
  136. this.logger.LogInformation(" Successfully subscribed on topic.");
  137. this.shouldReconnect = false;
  138. }
  139. else
  140. {
  141. this.logger.LogInformation(" Failed on subscribe on topic with Sub resultCode: "
  142. + subResult.Items.First().ResultCode);
  143. }
  144. }
  145. else
  146. this.logger.LogInformation(" Failed on subscribe on topic by generic error.");
  147. }
  148. else
  149. {
  150. this.logger.LogInformation(" Failed connecting to mqtt server with returned ResultCode: "
  151. + result.ResultCode
  152. + ", ReasonString:" + (result.ReasonString ?? ""));
  153. }
  154. }
  155. catch (Exception exx)
  156. {
  157. this.logger.LogError(" Connecting to mqtt server exceptioned: " + exx);
  158. }
  159. // keep retry...
  160. await Task.Delay(30000);
  161. }
  162. });
  163. }
  164. public Task Process(IContext<byte[], ScannerMessage> context)
  165. {
  166. if (context.Incoming.Message.RawContent == null || !context.Incoming.Message.RawContent.Any())
  167. { logger.LogDebug("Incoming an null or empty Scanner Msg."); return Task.CompletedTask; }
  168. logger.LogDebug("Incoming Scanner msg: "
  169. + context.Incoming.Message.RawContent.Select(b => b.ToString("X2")).Aggregate((acc, n) => acc + " " + n)
  170. + " (" + Encoding.UTF8.GetString(context.Incoming.Message.RawContent) + ")");
  171. if (!string.IsNullOrEmpty(this.topic_ScannerRequest))
  172. this.mqttClient.PublishAsync(new MqttApplicationMessage()
  173. {
  174. Topic = this.topic_ScannerRequest,
  175. Payload = context.Incoming.Message.RawContent
  176. });
  177. return Task.CompletedTask;
  178. }
  179. /// <summary>
  180. /// </summary>
  181. /// <param name="rawContent"></param>
  182. /// <returns>bytes that the device can read and operate</returns>
  183. private byte[] ParseFormatDataToDeviceData(byte[] rawContent)
  184. {
  185. return rawContent;
  186. }
  187. }
  188. }