123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- using Edge.Core.Parser;
- using Edge.Core.Processor.Communicator;
- using Edge.Core.Processor.Dispatcher.Attributes;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace Edge.Core.Processor
- {
- [MetaPartsDescriptor(
- "lang-zh-cn:通用型设备处理器lang-en-us:GenericDeviceProcessor",
- "lang-zh-cn:通用型设备处理器lang-en-us:Generic processor for comm with devices")]
- public class GenericDeviceProcessor<TRaw, TMessage> : IDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
- {
- #region performance related
-
- private static ILogger perfLogger = NullLogger.Instance;
- private Stopwatch perfWatch_From_CommOnDataReceived_To_HandlerProcessed;
- private Stopwatch perfWatch_From_CommOnDataReceived_To_CommOnDataWriting;
- private ConcurrentQueue<string> perfPeriodLogStrings;
- private System.Timers.Timer perfPeriodToLogFileTimer;
- #endregion
- private object syncObject = new object();
- public string MetaConfigName { get; set; }
- public IContext<TRaw, TMessage> Context { get; protected set; }
- public IList<IInterceptor<TRaw, TMessage>> Interceptors { get; }
- public ICommunicator<TRaw, TMessage> Communicator { get; }
- public GenericDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
- {
- if (services != null)
- {
- var loggerFactory = services.GetRequiredService<ILoggerFactory>();
- perfLogger = loggerFactory.CreateLogger("Performance");
- }
- if (perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed = new Stopwatch();
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed.Stop();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting = new Stopwatch();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting.Stop();
- this.perfPeriodLogStrings = new ConcurrentQueue<string>();
- this.perfPeriodToLogFileTimer = new System.Timers.Timer(30 * 1000);
- this.perfPeriodToLogFileTimer.Elapsed += (a, b) =>
- {
- try
- {
- this.perfPeriodToLogFileTimer.Stop();
- if (!perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Stop();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Stop();
- return;
- }
- }
- catch { }
- finally { }
- try
- {
- StringBuilder sb = null;
- string s = null;
- while (this.perfPeriodLogStrings.TryDequeue(out s))
- {
- if (sb == null) sb = new StringBuilder();
- sb.Append(s + Environment.NewLine);
- }
- if (sb != null && sb.Length > 0)
- perfLogger.LogTrace(sb.ToString());
- }
- finally
- {
- this.perfPeriodToLogFileTimer.Start();
- }
- };
- this.perfPeriodToLogFileTimer.Start();
- }
- this.Communicator = communicator;
- var incoming = new HistoryKeepIncoming<TMessage>(10);
- this.Context = new Context<TRaw, TMessage>(this, handler, communicator, incoming, new Outgoing<TRaw, TMessage>(incoming, services));
- this.Context.Outgoing.OnWriting += (s, a) =>
- {
- if (a.ExtraControlParameter != null)
- this.Communicator.Write(a.Message, a.ExtraControlParameter);
- else
- this.Communicator.Write(a.Message);
- };
- this.Communicator.OnDataReceived += (s, a) =>
- {
- lock (this.syncObject)
- {
- if (perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Restart();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Restart();
- }
- this.Context.Incoming.DisablePropagate = false;
- this.Context.Incoming.Message = a.Message;
- if (!this.Context.Incoming.DisablePropagate)
- handler.Process(this.Context);
- if (perfLogger.IsEnabled(LogLevel.Trace))
- this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
- + " From_CommOnDataReceived_To_HandlerProcessed elapsed " + (this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.ElapsedMilliseconds ?? -1));
- }
- };
- this.Communicator.OnRawDataWriting += (s, a) =>
- {
- if (perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
- + " From_CommOnDataReceived_To_CommOnDataWriting elapsed " + (this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.ElapsedMilliseconds ?? -1));
- }
- };
- }
-
-
-
-
- public async Task<bool> Start()
- {
- this.Context.Handler.Init(this.Context);
- var r = await this.Communicator.Start();
- return r;
- }
- public Task<bool> Stop()
- {
- this.perfPeriodToLogFileTimer?.Stop();
- this.Communicator.Dispose();
- this.Context.Dispose();
- if (this.Context.Handler is IDisposable dp)
- dp.Dispose();
- return Task.FromResult(true);
- }
- public Task Test(params object[] parameters)
- {
- return this.Context.Handler.Test(parameters);
- }
- }
-
-
-
-
-
- [MetaPartsDescriptor(
- "lang-zh-cn:能自动发送消息的设备处理器lang-en-us:AutoPollingDeviceProcessor",
- "lang-zh-cn:自动发送消息型处理器lang-en-us:Auto send polling message processor for comm with devices")]
- public class HalfDuplexActivePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
- {
- [ParamsJsonSchemas("HalfDuplexActivePollingDeviceProcessorCtorSchema")]
- public HalfDuplexActivePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, int autoPollingInterval, IServiceProvider services)
- : base(handler, communicator, services)
- {
- base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
- new TimeWindowWithActivePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, autoPollingInterval, services));
- this.Context.Outgoing.OnWriting += (s, a) =>
- {
- if (a.ExtraControlParameter != null)
- this.Communicator.Write(a.Message, a.ExtraControlParameter);
- else
- this.Communicator.Write(a.Message);
- };
- }
- }
-
-
-
-
-
- public class HalfDuplexNegativePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
- {
- public HalfDuplexNegativePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
- : base(handler, communicator, services)
- {
- base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
- new TimeWindowWithNegativePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, services));
- this.Context.Outgoing.OnWriting += (s, a) =>
- {
- if (a.ExtraControlParameter != null)
- this.Communicator.Write(a.Message, a.ExtraControlParameter);
- else
- this.Communicator.Write(a.Message);
- };
- }
- }
- }
|