GenericDeviceProcessor.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. using Edge.Core.MqttClient;
  2. using Edge.Core.Parser;
  3. using Edge.Core.Parser.BinaryParser.MessageEntity;
  4. using Edge.Core.Processor.Communicator;
  5. using Edge.Core.Processor.Dispatcher.Attributes;
  6. using Microsoft.Extensions.Configuration;
  7. using Microsoft.Extensions.DependencyInjection;
  8. using Microsoft.Extensions.Logging;
  9. using Microsoft.Extensions.Logging.Abstractions;
  10. using Newtonsoft.Json;
  11. using System;
  12. using System.Collections.Concurrent;
  13. using System.Collections.Generic;
  14. using System.Diagnostics;
  15. using System.Linq;
  16. using System.Text;
  17. using System.Threading;
  18. using System.Threading.Tasks;
  19. namespace Edge.Core.Processor
  20. {
  21. [MetaPartsDescriptor(
  22. "lang-zh-cn:通用型设备处理器lang-en-us:GenericDeviceProcessor",
  23. "lang-zh-cn:通用型设备处理器lang-en-us:Generic processor for comm with devices")]
  24. public class GenericDeviceProcessor<TRaw, TMessage> : IDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  25. {
  26. #region performance related
  27. private static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Performance");
  28. private static ILogger perfLogger = NullLogger.Instance;
  29. private Stopwatch perfWatch_From_CommOnDataReceived_To_HandlerProcessed;
  30. private Stopwatch perfWatch_From_CommOnDataReceived_To_CommOnDataWriting;
  31. private ConcurrentQueue<string> perfPeriodLogStrings;
  32. private System.Timers.Timer perfPeriodToLogFileTimer;
  33. #endregion
  34. private object syncObject = new object();
  35. public string MetaConfigName { get; set; }
  36. public IContext<TRaw, TMessage> Context { get; protected set; }
  37. public IList<IInterceptor<TRaw, TMessage>> Interceptors { get; }
  38. public ICommunicator<TRaw, TMessage> Communicator { get; }
  39. private static bool isStartMqtt = false;
  40. public GenericDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  41. {
  42. if (services != null)
  43. {
  44. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  45. perfLogger = loggerFactory.CreateLogger("Performance");
  46. }
  47. if (perfLogger.IsEnabled(LogLevel.Trace))
  48. {
  49. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed = new Stopwatch();
  50. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed.Stop();
  51. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting = new Stopwatch();
  52. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting.Stop();
  53. this.perfPeriodLogStrings = new ConcurrentQueue<string>();
  54. this.perfPeriodToLogFileTimer = new System.Timers.Timer(30 * 1000);// flush to log every 30 seconds
  55. this.perfPeriodToLogFileTimer.Elapsed += (a, b) =>
  56. {
  57. try
  58. {
  59. this.perfPeriodToLogFileTimer.Stop();
  60. if (!perfLogger.IsEnabled(LogLevel.Trace))
  61. {
  62. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Stop();
  63. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Stop();
  64. return;
  65. }
  66. }
  67. catch { }
  68. finally { }
  69. try
  70. {
  71. StringBuilder sb = null;
  72. string s = null;
  73. while (this.perfPeriodLogStrings.TryDequeue(out s))
  74. {
  75. if (sb == null) sb = new StringBuilder();
  76. sb.Append(s + Environment.NewLine);
  77. }
  78. if (sb != null && sb.Length > 0)
  79. perfLogger.LogTrace(sb.ToString());
  80. }
  81. finally
  82. {
  83. this.perfPeriodToLogFileTimer.Start();
  84. }
  85. };
  86. this.perfPeriodToLogFileTimer.Start();
  87. }
  88. this.Communicator = communicator;
  89. var incoming = new HistoryKeepIncoming<TMessage>(10);
  90. Outgoing<TRaw, TMessage> outgoing = new Outgoing<TRaw, TMessage>(incoming, services);
  91. this.Context = new Context<TRaw, TMessage>(this, handler, communicator, incoming, outgoing);
  92. this.Context.Outgoing.OnWriting += (s, a) =>
  93. {
  94. if (a.ExtraControlParameter != null)
  95. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  96. else
  97. this.Communicator.Write(a.Message);
  98. };
  99. this.Communicator.OnConnected += (communicator, a) =>
  100. {
  101. outgoing.OnConnect();
  102. handler.SendQRCodeAsync();
  103. //if (communicator is IClinet)
  104. //{
  105. // IClinet clinet = (IClinet)communicator;
  106. // handler.SetTcpClient(clinet?.GetTcpClient(), clinet?.GetServerPort());
  107. // handler.SendQRCodeAsync();
  108. //}
  109. };
  110. this.Communicator.OnDisconnected += (communicator, a) =>
  111. {
  112. outgoing.OnDisconnect();
  113. //handler.OnTcpDisconnect();
  114. };
  115. this.Communicator.OnDataReceived += (s, a) =>
  116. {
  117. lock (this.syncObject)
  118. {
  119. if (perfLogger.IsEnabled(LogLevel.Trace))
  120. {
  121. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Restart();
  122. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Restart();
  123. }
  124. this.Context.Incoming.DisablePropagate = false;
  125. this.Context.Incoming.Message = a.Message;
  126. if (!this.Context.Incoming.DisablePropagate)
  127. handler.Process(this.Context);
  128. if (perfLogger.IsEnabled(LogLevel.Trace))
  129. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  130. + " From_CommOnDataReceived_To_HandlerProcessed elapsed " + (this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.ElapsedMilliseconds ?? -1));
  131. }
  132. };
  133. this.Communicator.OnRawDataWriting += (s, a) =>
  134. {
  135. if (perfLogger.IsEnabled(LogLevel.Trace))
  136. {
  137. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  138. + " From_CommOnDataReceived_To_CommOnDataWriting elapsed " + (this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.ElapsedMilliseconds ?? -1));
  139. }
  140. };
  141. //------------------------- MQTT ---------------------------------
  142. if (!isStartMqtt)
  143. {
  144. MqttClientService mqttClientService = new MqttClientService(services.GetService<IConfiguration>());
  145. mqttClientService.OnConnect += (s, a) => { };
  146. mqttClientService.OnDisconnect += (s, a) => { };
  147. mqttClientService.OnApplicationMessageReceived += (s, a) =>
  148. {
  149. var message = Encoding.UTF8.GetString(a.ApplicationMessage.Payload);
  150. handler.OnReceiveMqttMessage(a.ApplicationMessage.Topic, message);
  151. };
  152. mqttClientService.Start();
  153. isStartMqtt = true;
  154. }
  155. }
  156. //public void Dispose()
  157. //{
  158. // this.Communicator.Dispose();
  159. //}
  160. public async Task<bool> Start()
  161. {
  162. this.Context.Handler.Init(this.Context);
  163. var r = await this.Communicator.Start();
  164. return r;
  165. }
  166. public Task<bool> Stop()
  167. {
  168. this.perfPeriodToLogFileTimer?.Stop();
  169. this.Communicator.Dispose();
  170. this.Context.Dispose();
  171. if (this.Context.Handler is IDisposable dp)
  172. dp.Dispose();
  173. return Task.FromResult(true);
  174. }
  175. public Task Test(params object[] parameters)
  176. {
  177. return this.Context.Handler.Test(parameters);
  178. }
  179. }
  180. /// <summary>
  181. /// used for scenario of FC as master to actively polling devices.
  182. /// </summary>
  183. /// <typeparam name="TRaw"></typeparam>
  184. /// <typeparam name="TMessage"></typeparam>
  185. [MetaPartsDescriptor(
  186. "lang-zh-cn:能自动发送消息的设备处理器lang-en-us:AutoPollingDeviceProcessor",
  187. "lang-zh-cn:自动发送消息型处理器lang-en-us:Auto send polling message processor for comm with devices")]
  188. public class HalfDuplexActivePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  189. {
  190. [ParamsJsonSchemas("HalfDuplexActivePollingDeviceProcessorCtorSchema")]
  191. public HalfDuplexActivePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, int autoPollingInterval, IServiceProvider services)
  192. : base(handler, communicator, services)
  193. {
  194. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  195. new TimeWindowWithActivePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, autoPollingInterval, services));
  196. this.Context.Outgoing.OnWriting += (s, a) =>
  197. {
  198. if (a.ExtraControlParameter != null)
  199. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  200. else
  201. this.Communicator.Write(a.Message);
  202. };
  203. }
  204. }
  205. /// <summary>
  206. /// used for scenario of FC as slave to receive polling from devices.
  207. /// </summary>
  208. /// <typeparam name="TRaw"></typeparam>
  209. /// <typeparam name="TMessage"></typeparam>
  210. public class HalfDuplexNegativePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  211. {
  212. public HalfDuplexNegativePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  213. : base(handler, communicator, services)
  214. {
  215. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  216. new TimeWindowWithNegativePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, services));
  217. this.Context.Outgoing.OnWriting += (s, a) =>
  218. {
  219. if (a.ExtraControlParameter != null)
  220. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  221. else
  222. this.Communicator.Write(a.Message);
  223. };
  224. }
  225. }
  226. }