using Edge.Core.Parser; using Edge.Core.Processor.Communicator; using Edge.Core.Processor.Dispatcher.Attributes; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Edge.Core.Processor { [MetaPartsDescriptor( "lang-zh-cn:通用型设备处理器lang-en-us:GenericDeviceProcessor", "lang-zh-cn:通用型设备处理器lang-en-us:Generic processor for comm with devices")] public class GenericDeviceProcessor : IDeviceProcessor where TMessage : MessageBase { #region performance related //private static NLog.Logger perfLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Performance"); private static ILogger perfLogger = NullLogger.Instance; private Stopwatch perfWatch_From_CommOnDataReceived_To_HandlerProcessed; private Stopwatch perfWatch_From_CommOnDataReceived_To_CommOnDataWriting; private ConcurrentQueue perfPeriodLogStrings; private System.Timers.Timer perfPeriodToLogFileTimer; #endregion private object syncObject = new object(); public string MetaConfigName { get; set; } public IContext Context { get; protected set; } public IList> Interceptors { get; } public ICommunicator Communicator { get; } public GenericDeviceProcessor(IDeviceHandler handler, ICommunicator communicator, IServiceProvider services) { if (services != null) { var loggerFactory = services.GetRequiredService(); perfLogger = loggerFactory.CreateLogger("Performance"); } if (perfLogger.IsEnabled(LogLevel.Trace)) { this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed = new Stopwatch(); this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed.Stop(); this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting = new Stopwatch(); this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting.Stop(); this.perfPeriodLogStrings = new ConcurrentQueue(); this.perfPeriodToLogFileTimer = new System.Timers.Timer(30 * 1000);// flush to log every 30 seconds this.perfPeriodToLogFileTimer.Elapsed += (a, b) => { try { this.perfPeriodToLogFileTimer.Stop(); if (!perfLogger.IsEnabled(LogLevel.Trace)) { this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Stop(); this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Stop(); return; } } catch { } finally { } try { StringBuilder sb = null; string s = null; while (this.perfPeriodLogStrings.TryDequeue(out s)) { if (sb == null) sb = new StringBuilder(); sb.Append(s + Environment.NewLine); } if (sb != null && sb.Length > 0) perfLogger.LogTrace(sb.ToString()); } finally { this.perfPeriodToLogFileTimer.Start(); } }; this.perfPeriodToLogFileTimer.Start(); } this.Communicator = communicator; var incoming = new HistoryKeepIncoming(10); this.Context = new Context(this, handler, communicator, incoming, new Outgoing(incoming, services)); this.Context.Outgoing.OnWriting += (s, a) => { if (a.ExtraControlParameter != null) this.Communicator.Write(a.Message, a.ExtraControlParameter); else this.Communicator.Write(a.Message); }; this.Communicator.OnDataReceived += (s, a) => { lock (this.syncObject) { if (perfLogger.IsEnabled(LogLevel.Trace)) { this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Restart(); this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Restart(); } this.Context.Incoming.DisablePropagate = false; this.Context.Incoming.Message = a.Message; if (!this.Context.Incoming.DisablePropagate) handler.Process(this.Context); if (perfLogger.IsEnabled(LogLevel.Trace)) this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - " + " From_CommOnDataReceived_To_HandlerProcessed elapsed " + (this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.ElapsedMilliseconds ?? -1)); } }; this.Communicator.OnRawDataWriting += (s, a) => { if (perfLogger.IsEnabled(LogLevel.Trace)) { this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - " + " From_CommOnDataReceived_To_CommOnDataWriting elapsed " + (this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.ElapsedMilliseconds ?? -1)); } }; } //public void Dispose() //{ // this.Communicator.Dispose(); //} public async Task Start() { this.Context.Handler.Init(this.Context); var r = await this.Communicator.Start(); return r; } public Task Stop() { this.perfPeriodToLogFileTimer?.Stop(); this.Communicator.Dispose(); this.Context.Dispose(); if (this.Context.Handler is IDisposable dp) dp.Dispose(); return Task.FromResult(true); } public Task Test(params object[] parameters) { return this.Context.Handler.Test(parameters); } } /// /// used for scenario of FC as master to actively polling devices. /// /// /// [MetaPartsDescriptor( "lang-zh-cn:能自动发送消息的设备处理器lang-en-us:AutoPollingDeviceProcessor", "lang-zh-cn:自动发送消息型处理器lang-en-us:Auto send polling message processor for comm with devices")] public class HalfDuplexActivePollingDeviceProcessor : GenericDeviceProcessor where TMessage : MessageBase { [ParamsJsonSchemas("HalfDuplexActivePollingDeviceProcessorCtorSchema")] public HalfDuplexActivePollingDeviceProcessor(IDeviceHandler handler, ICommunicator communicator, int autoPollingInterval, IServiceProvider services) : base(handler, communicator, services) { base.Context = new Context(this, handler, communicator, base.Context.Incoming, new TimeWindowWithActivePollingOutgoing(base.Context.Incoming, autoPollingInterval, services)); this.Context.Outgoing.OnWriting += (s, a) => { if (a.ExtraControlParameter != null) this.Communicator.Write(a.Message, a.ExtraControlParameter); else this.Communicator.Write(a.Message); }; } } /// /// used for scenario of FC as slave to receive polling from devices. /// /// /// public class HalfDuplexNegativePollingDeviceProcessor : GenericDeviceProcessor where TMessage : MessageBase { public HalfDuplexNegativePollingDeviceProcessor(IDeviceHandler handler, ICommunicator communicator, IServiceProvider services) : base(handler, communicator, services) { base.Context = new Context(this, handler, communicator, base.Context.Incoming, new TimeWindowWithNegativePollingOutgoing(base.Context.Incoming, services)); this.Context.Outgoing.OnWriting += (s, a) => { if (a.ExtraControlParameter != null) this.Communicator.Write(a.Message, a.ExtraControlParameter); else this.Communicator.Write(a.Message); }; } } }