GenericDeviceProcessor.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. using Edge.Core.MqttClient;
  2. using Edge.Core.Parser;
  3. using Edge.Core.Parser.BinaryParser.MessageEntity;
  4. using Edge.Core.Processor.Communicator;
  5. using Edge.Core.Processor.Dispatcher.Attributes;
  6. using Microsoft.Extensions.DependencyInjection;
  7. using Microsoft.Extensions.Logging;
  8. using Microsoft.Extensions.Logging.Abstractions;
  9. using System;
  10. using System.Collections.Concurrent;
  11. using System.Collections.Generic;
  12. using System.Diagnostics;
  13. using System.Linq;
  14. using System.Text;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. namespace Edge.Core.Processor
  18. {
  19. [MetaPartsDescriptor(
  20. "lang-zh-cn:通用型设备处理器lang-en-us:GenericDeviceProcessor",
  21. "lang-zh-cn:通用型设备处理器lang-en-us:Generic processor for comm with devices")]
  22. public class GenericDeviceProcessor<TRaw, TMessage> : IDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  23. {
  24. #region performance related
  25. //private static NLog.Logger perfLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Performance");
  26. private static ILogger perfLogger = NullLogger.Instance;
  27. private Stopwatch perfWatch_From_CommOnDataReceived_To_HandlerProcessed;
  28. private Stopwatch perfWatch_From_CommOnDataReceived_To_CommOnDataWriting;
  29. private ConcurrentQueue<string> perfPeriodLogStrings;
  30. private System.Timers.Timer perfPeriodToLogFileTimer;
  31. #endregion
  32. private object syncObject = new object();
  33. public string MetaConfigName { get; set; }
  34. public IContext<TRaw, TMessage> Context { get; protected set; }
  35. public IList<IInterceptor<TRaw, TMessage>> Interceptors { get; }
  36. public ICommunicator<TRaw, TMessage> Communicator { get; }
  37. public GenericDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  38. {
  39. if (services != null)
  40. {
  41. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  42. perfLogger = loggerFactory.CreateLogger("Performance");
  43. }
  44. if (perfLogger.IsEnabled(LogLevel.Trace))
  45. {
  46. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed = new Stopwatch();
  47. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed.Stop();
  48. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting = new Stopwatch();
  49. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting.Stop();
  50. this.perfPeriodLogStrings = new ConcurrentQueue<string>();
  51. this.perfPeriodToLogFileTimer = new System.Timers.Timer(30 * 1000);// flush to log every 30 seconds
  52. this.perfPeriodToLogFileTimer.Elapsed += (a, b) =>
  53. {
  54. try
  55. {
  56. this.perfPeriodToLogFileTimer.Stop();
  57. if (!perfLogger.IsEnabled(LogLevel.Trace))
  58. {
  59. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Stop();
  60. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Stop();
  61. return;
  62. }
  63. }
  64. catch { }
  65. finally { }
  66. try
  67. {
  68. StringBuilder sb = null;
  69. string s = null;
  70. while (this.perfPeriodLogStrings.TryDequeue(out s))
  71. {
  72. if (sb == null) sb = new StringBuilder();
  73. sb.Append(s + Environment.NewLine);
  74. }
  75. if (sb != null && sb.Length > 0)
  76. perfLogger.LogTrace(sb.ToString());
  77. }
  78. finally
  79. {
  80. this.perfPeriodToLogFileTimer.Start();
  81. }
  82. };
  83. this.perfPeriodToLogFileTimer.Start();
  84. }
  85. this.Communicator = communicator;
  86. var incoming = new HistoryKeepIncoming<TMessage>(10);
  87. Outgoing<TRaw, TMessage> outgoing = new Outgoing<TRaw, TMessage>(incoming, services);
  88. this.Context = new Context<TRaw, TMessage>(this, handler, communicator, incoming, outgoing);
  89. this.Context.Outgoing.OnWriting += (s, a) =>
  90. {
  91. if (a.ExtraControlParameter != null)
  92. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  93. else
  94. this.Communicator.Write(a.Message);
  95. };
  96. this.Communicator.OnConnected += (communicator, a) =>
  97. {
  98. outgoing.OnConnect();
  99. handler.SendQRCodeAsync();
  100. //if (communicator is IClinet)
  101. //{
  102. // IClinet clinet = (IClinet)communicator;
  103. // handler.SetTcpClient(clinet?.GetTcpClient(), clinet?.GetServerPort());
  104. // handler.SendQRCodeAsync();
  105. //}
  106. };
  107. this.Communicator.OnDisconnected += (communicator, a) =>
  108. {
  109. outgoing.OnDisconnect();
  110. //handler.OnTcpDisconnect();
  111. };
  112. this.Communicator.OnDataReceived += (s, a) =>
  113. {
  114. lock (this.syncObject)
  115. {
  116. if (perfLogger.IsEnabled(LogLevel.Trace))
  117. {
  118. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Restart();
  119. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Restart();
  120. }
  121. this.Context.Incoming.DisablePropagate = false;
  122. this.Context.Incoming.Message = a.Message;
  123. if (!this.Context.Incoming.DisablePropagate)
  124. handler.Process(this.Context);
  125. if (perfLogger.IsEnabled(LogLevel.Trace))
  126. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  127. + " From_CommOnDataReceived_To_HandlerProcessed elapsed " + (this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.ElapsedMilliseconds ?? -1));
  128. }
  129. };
  130. this.Communicator.OnRawDataWriting += (s, a) =>
  131. {
  132. if (perfLogger.IsEnabled(LogLevel.Trace))
  133. {
  134. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  135. + " From_CommOnDataReceived_To_CommOnDataWriting elapsed " + (this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.ElapsedMilliseconds ?? -1));
  136. }
  137. };
  138. //------------------------- MQTT ---------------------------------
  139. MqttClientService mqttClientService = new MqttClientService();
  140. mqttClientService.OnConnect += (s, a) => { };
  141. mqttClientService.OnDisconnect += (s, a) => { };
  142. mqttClientService.OnApplicationMessageReceived += (s, a) =>
  143. {
  144. var message = Encoding.UTF8.GetString(a.ApplicationMessage.Payload);
  145. handler.OnReceiveMqttMessage(message);
  146. };
  147. mqttClientService.Start();
  148. }
  149. //public void Dispose()
  150. //{
  151. // this.Communicator.Dispose();
  152. //}
  153. public async Task<bool> Start()
  154. {
  155. this.Context.Handler.Init(this.Context);
  156. var r = await this.Communicator.Start();
  157. return r;
  158. }
  159. public Task<bool> Stop()
  160. {
  161. this.perfPeriodToLogFileTimer?.Stop();
  162. this.Communicator.Dispose();
  163. this.Context.Dispose();
  164. if (this.Context.Handler is IDisposable dp)
  165. dp.Dispose();
  166. return Task.FromResult(true);
  167. }
  168. public Task Test(params object[] parameters)
  169. {
  170. return this.Context.Handler.Test(parameters);
  171. }
  172. }
  173. /// <summary>
  174. /// used for scenario of FC as master to actively polling devices.
  175. /// </summary>
  176. /// <typeparam name="TRaw"></typeparam>
  177. /// <typeparam name="TMessage"></typeparam>
  178. [MetaPartsDescriptor(
  179. "lang-zh-cn:能自动发送消息的设备处理器lang-en-us:AutoPollingDeviceProcessor",
  180. "lang-zh-cn:自动发送消息型处理器lang-en-us:Auto send polling message processor for comm with devices")]
  181. public class HalfDuplexActivePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  182. {
  183. [ParamsJsonSchemas("HalfDuplexActivePollingDeviceProcessorCtorSchema")]
  184. public HalfDuplexActivePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, int autoPollingInterval, IServiceProvider services)
  185. : base(handler, communicator, services)
  186. {
  187. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  188. new TimeWindowWithActivePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, autoPollingInterval, services));
  189. this.Context.Outgoing.OnWriting += (s, a) =>
  190. {
  191. if (a.ExtraControlParameter != null)
  192. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  193. else
  194. this.Communicator.Write(a.Message);
  195. };
  196. }
  197. }
  198. /// <summary>
  199. /// used for scenario of FC as slave to receive polling from devices.
  200. /// </summary>
  201. /// <typeparam name="TRaw"></typeparam>
  202. /// <typeparam name="TMessage"></typeparam>
  203. public class HalfDuplexNegativePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  204. {
  205. public HalfDuplexNegativePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  206. : base(handler, communicator, services)
  207. {
  208. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  209. new TimeWindowWithNegativePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, services));
  210. this.Context.Outgoing.OnWriting += (s, a) =>
  211. {
  212. if (a.ExtraControlParameter != null)
  213. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  214. else
  215. this.Communicator.Write(a.Message);
  216. };
  217. }
  218. }
  219. }