GenericDeviceProcessor.cs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. using Edge.Core.Parser;
  2. using Edge.Core.Processor.Communicator;
  3. using Edge.Core.Processor.Dispatcher.Attributes;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using Microsoft.Extensions.Logging;
  6. using Microsoft.Extensions.Logging.Abstractions;
  7. using System;
  8. using System.Collections.Concurrent;
  9. using System.Collections.Generic;
  10. using System.Diagnostics;
  11. using System.Linq;
  12. using System.Text;
  13. using System.Threading;
  14. using System.Threading.Tasks;
  15. namespace Edge.Core.Processor
  16. {
  17. [MetaPartsDescriptor(
  18. "lang-zh-cn:通用型设备处理器lang-en-us:GenericDeviceProcessor",
  19. "lang-zh-cn:通用型设备处理器lang-en-us:Generic processor for comm with devices")]
  20. public class GenericDeviceProcessor<TRaw, TMessage> : IDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  21. {
  22. #region performance related
  23. //private static NLog.Logger perfLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Performance");
  24. private static ILogger perfLogger = NullLogger.Instance;
  25. private Stopwatch perfWatch_From_CommOnDataReceived_To_HandlerProcessed;
  26. private Stopwatch perfWatch_From_CommOnDataReceived_To_CommOnDataWriting;
  27. private ConcurrentQueue<string> perfPeriodLogStrings;
  28. private System.Timers.Timer perfPeriodToLogFileTimer;
  29. #endregion
  30. private object syncObject = new object();
  31. public string MetaConfigName { get; set; }
  32. public IContext<TRaw, TMessage> Context { get; protected set; }
  33. public IList<IInterceptor<TRaw, TMessage>> Interceptors { get; }
  34. public ICommunicator<TRaw, TMessage> Communicator { get; }
  35. public GenericDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  36. {
  37. if (services != null)
  38. {
  39. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  40. perfLogger = loggerFactory.CreateLogger("Performance");
  41. }
  42. if (perfLogger.IsEnabled(LogLevel.Trace))
  43. {
  44. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed = new Stopwatch();
  45. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed.Stop();
  46. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting = new Stopwatch();
  47. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting.Stop();
  48. this.perfPeriodLogStrings = new ConcurrentQueue<string>();
  49. this.perfPeriodToLogFileTimer = new System.Timers.Timer(30 * 1000);// flush to log every 30 seconds
  50. this.perfPeriodToLogFileTimer.Elapsed += (a, b) =>
  51. {
  52. try
  53. {
  54. this.perfPeriodToLogFileTimer.Stop();
  55. if (!perfLogger.IsEnabled(LogLevel.Trace))
  56. {
  57. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Stop();
  58. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Stop();
  59. return;
  60. }
  61. }
  62. catch { }
  63. finally { }
  64. try
  65. {
  66. StringBuilder sb = null;
  67. string s = null;
  68. while (this.perfPeriodLogStrings.TryDequeue(out s))
  69. {
  70. if (sb == null) sb = new StringBuilder();
  71. sb.Append(s + Environment.NewLine);
  72. }
  73. if (sb != null && sb.Length > 0)
  74. perfLogger.LogTrace(sb.ToString());
  75. }
  76. finally
  77. {
  78. this.perfPeriodToLogFileTimer.Start();
  79. }
  80. };
  81. this.perfPeriodToLogFileTimer.Start();
  82. }
  83. this.Communicator = communicator;
  84. var incoming = new HistoryKeepIncoming<TMessage>(10);
  85. this.Context = new Context<TRaw, TMessage>(this, handler, communicator, incoming, new Outgoing<TRaw, TMessage>(incoming, services));
  86. this.Context.Outgoing.OnWriting += (s, a) =>
  87. {
  88. if (a.ExtraControlParameter != null)
  89. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  90. else
  91. this.Communicator.Write(a.Message);
  92. };
  93. this.Communicator.OnDataReceived += (s, a) =>
  94. {
  95. lock (this.syncObject)
  96. {
  97. if (perfLogger.IsEnabled(LogLevel.Trace))
  98. {
  99. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Restart();
  100. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Restart();
  101. }
  102. this.Context.Incoming.DisablePropagate = false;
  103. this.Context.Incoming.Message = a.Message;
  104. if (!this.Context.Incoming.DisablePropagate)
  105. handler.Process(this.Context);
  106. if (perfLogger.IsEnabled(LogLevel.Trace))
  107. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  108. + " From_CommOnDataReceived_To_HandlerProcessed elapsed " + (this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.ElapsedMilliseconds ?? -1));
  109. }
  110. };
  111. this.Communicator.OnRawDataWriting += (s, a) =>
  112. {
  113. if (perfLogger.IsEnabled(LogLevel.Trace))
  114. {
  115. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  116. + " From_CommOnDataReceived_To_CommOnDataWriting elapsed " + (this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.ElapsedMilliseconds ?? -1));
  117. }
  118. };
  119. }
  120. //public void Dispose()
  121. //{
  122. // this.Communicator.Dispose();
  123. //}
  124. public async Task<bool> Start()
  125. {
  126. this.Context.Handler.Init(this.Context);
  127. var r = await this.Communicator.Start();
  128. return r;
  129. }
  130. public Task<bool> Stop()
  131. {
  132. this.perfPeriodToLogFileTimer?.Stop();
  133. this.Communicator.Dispose();
  134. this.Context.Dispose();
  135. if (this.Context.Handler is IDisposable dp)
  136. dp.Dispose();
  137. return Task.FromResult(true);
  138. }
  139. public Task Test(params object[] parameters)
  140. {
  141. return this.Context.Handler.Test(parameters);
  142. }
  143. }
  144. /// <summary>
  145. /// used for scenario of FC as master to actively polling devices.
  146. /// </summary>
  147. /// <typeparam name="TRaw"></typeparam>
  148. /// <typeparam name="TMessage"></typeparam>
  149. [MetaPartsDescriptor(
  150. "lang-zh-cn:能自动发送消息的设备处理器lang-en-us:AutoPollingDeviceProcessor",
  151. "lang-zh-cn:自动发送消息型处理器lang-en-us:Auto send polling message processor for comm with devices")]
  152. public class HalfDuplexActivePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  153. {
  154. [ParamsJsonSchemas("HalfDuplexActivePollingDeviceProcessorCtorSchema")]
  155. public HalfDuplexActivePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, int autoPollingInterval, IServiceProvider services)
  156. : base(handler, communicator, services)
  157. {
  158. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  159. new TimeWindowWithActivePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, autoPollingInterval, services));
  160. this.Context.Outgoing.OnWriting += (s, a) =>
  161. {
  162. if (a.ExtraControlParameter != null)
  163. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  164. else
  165. this.Communicator.Write(a.Message);
  166. };
  167. }
  168. }
  169. /// <summary>
  170. /// used for scenario of FC as slave to receive polling from devices.
  171. /// </summary>
  172. /// <typeparam name="TRaw"></typeparam>
  173. /// <typeparam name="TMessage"></typeparam>
  174. public class HalfDuplexNegativePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  175. {
  176. public HalfDuplexNegativePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  177. : base(handler, communicator, services)
  178. {
  179. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  180. new TimeWindowWithNegativePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, services));
  181. this.Context.Outgoing.OnWriting += (s, a) =>
  182. {
  183. if (a.ExtraControlParameter != null)
  184. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  185. else
  186. this.Communicator.Write(a.Message);
  187. };
  188. }
  189. }
  190. }