123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- using Edge.Core.MqttClient;
- using Edge.Core.Parser;
- using Edge.Core.Parser.BinaryParser.MessageEntity;
- using Edge.Core.Processor.Communicator;
- using Edge.Core.Processor.Dispatcher.Attributes;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace Edge.Core.Processor
- {
- [MetaPartsDescriptor(
- "lang-zh-cn:通用型设备处理器lang-en-us:GenericDeviceProcessor",
- "lang-zh-cn:通用型设备处理器lang-en-us:Generic processor for comm with devices")]
- public class GenericDeviceProcessor<TRaw, TMessage> : IDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
- {
- #region performance related
- //private static NLog.Logger perfLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Performance");
- private static ILogger perfLogger = NullLogger.Instance;
- private Stopwatch perfWatch_From_CommOnDataReceived_To_HandlerProcessed;
- private Stopwatch perfWatch_From_CommOnDataReceived_To_CommOnDataWriting;
- private ConcurrentQueue<string> perfPeriodLogStrings;
- private System.Timers.Timer perfPeriodToLogFileTimer;
- #endregion
- private object syncObject = new object();
- public string MetaConfigName { get; set; }
- public IContext<TRaw, TMessage> Context { get; protected set; }
- public IList<IInterceptor<TRaw, TMessage>> Interceptors { get; }
- public ICommunicator<TRaw, TMessage> Communicator { get; }
- public GenericDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
- {
- if (services != null)
- {
- var loggerFactory = services.GetRequiredService<ILoggerFactory>();
- perfLogger = loggerFactory.CreateLogger("Performance");
- }
- if (perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed = new Stopwatch();
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed.Stop();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting = new Stopwatch();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting.Stop();
- this.perfPeriodLogStrings = new ConcurrentQueue<string>();
- this.perfPeriodToLogFileTimer = new System.Timers.Timer(30 * 1000);// flush to log every 30 seconds
- this.perfPeriodToLogFileTimer.Elapsed += (a, b) =>
- {
- try
- {
- this.perfPeriodToLogFileTimer.Stop();
- if (!perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Stop();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Stop();
- return;
- }
- }
- catch { }
- finally { }
- try
- {
- StringBuilder sb = null;
- string s = null;
- while (this.perfPeriodLogStrings.TryDequeue(out s))
- {
- if (sb == null) sb = new StringBuilder();
- sb.Append(s + Environment.NewLine);
- }
- if (sb != null && sb.Length > 0)
- perfLogger.LogTrace(sb.ToString());
- }
- finally
- {
- this.perfPeriodToLogFileTimer.Start();
- }
- };
- this.perfPeriodToLogFileTimer.Start();
- }
- this.Communicator = communicator;
- var incoming = new HistoryKeepIncoming<TMessage>(10);
- Outgoing<TRaw, TMessage> outgoing = new Outgoing<TRaw, TMessage>(incoming, services);
- this.Context = new Context<TRaw, TMessage>(this, handler, communicator, incoming, outgoing);
- this.Context.Outgoing.OnWriting += (s, a) =>
- {
- if (a.ExtraControlParameter != null)
- this.Communicator.Write(a.Message, a.ExtraControlParameter);
- else
- this.Communicator.Write(a.Message);
- };
- this.Communicator.OnConnected += (communicator, a) =>
- {
- outgoing.OnConnect();
- handler.SendQRCodeAsync();
- //if (communicator is IClinet)
- //{
- // IClinet clinet = (IClinet)communicator;
- // handler.SetTcpClient(clinet?.GetTcpClient(), clinet?.GetServerPort());
- // handler.SendQRCodeAsync();
- //}
- };
- this.Communicator.OnDisconnected += (communicator, a) =>
- {
- outgoing.OnDisconnect();
- //handler.OnTcpDisconnect();
- };
- this.Communicator.OnDataReceived += (s, a) =>
- {
- lock (this.syncObject)
- {
- if (perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Restart();
- this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Restart();
- }
- this.Context.Incoming.DisablePropagate = false;
- this.Context.Incoming.Message = a.Message;
- if (!this.Context.Incoming.DisablePropagate)
- handler.Process(this.Context);
- if (perfLogger.IsEnabled(LogLevel.Trace))
- this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
- + " From_CommOnDataReceived_To_HandlerProcessed elapsed " + (this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.ElapsedMilliseconds ?? -1));
- }
- };
- this.Communicator.OnRawDataWriting += (s, a) =>
- {
- if (perfLogger.IsEnabled(LogLevel.Trace))
- {
- this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
- + " From_CommOnDataReceived_To_CommOnDataWriting elapsed " + (this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.ElapsedMilliseconds ?? -1));
- }
- };
- //------------------------- MQTT ---------------------------------
- MqttClientService mqttClientService = new MqttClientService();
- mqttClientService.OnConnect += (s, a) => { };
- mqttClientService.OnDisconnect += (s, a) => { };
- mqttClientService.OnApplicationMessageReceived += (s, a) =>
- {
- var message = Encoding.UTF8.GetString(a.ApplicationMessage.Payload);
- handler.OnReceiveMqttMessage(message);
- };
-
- mqttClientService.Start();
- }
- //public void Dispose()
- //{
- // this.Communicator.Dispose();
- //}
- public async Task<bool> Start()
- {
- this.Context.Handler.Init(this.Context);
- var r = await this.Communicator.Start();
- return r;
- }
- public Task<bool> Stop()
- {
- this.perfPeriodToLogFileTimer?.Stop();
- this.Communicator.Dispose();
- this.Context.Dispose();
- if (this.Context.Handler is IDisposable dp)
- dp.Dispose();
- return Task.FromResult(true);
- }
- public Task Test(params object[] parameters)
- {
- return this.Context.Handler.Test(parameters);
- }
- }
- /// <summary>
- /// used for scenario of FC as master to actively polling devices.
- /// </summary>
- /// <typeparam name="TRaw"></typeparam>
- /// <typeparam name="TMessage"></typeparam>
- [MetaPartsDescriptor(
- "lang-zh-cn:能自动发送消息的设备处理器lang-en-us:AutoPollingDeviceProcessor",
- "lang-zh-cn:自动发送消息型处理器lang-en-us:Auto send polling message processor for comm with devices")]
- public class HalfDuplexActivePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
- {
- [ParamsJsonSchemas("HalfDuplexActivePollingDeviceProcessorCtorSchema")]
- public HalfDuplexActivePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, int autoPollingInterval, IServiceProvider services)
- : base(handler, communicator, services)
- {
- base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
- new TimeWindowWithActivePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, autoPollingInterval, services));
- this.Context.Outgoing.OnWriting += (s, a) =>
- {
- if (a.ExtraControlParameter != null)
- this.Communicator.Write(a.Message, a.ExtraControlParameter);
- else
- this.Communicator.Write(a.Message);
- };
- }
- }
- /// <summary>
- /// used for scenario of FC as slave to receive polling from devices.
- /// </summary>
- /// <typeparam name="TRaw"></typeparam>
- /// <typeparam name="TMessage"></typeparam>
- public class HalfDuplexNegativePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
- {
- public HalfDuplexNegativePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
- : base(handler, communicator, services)
- {
- base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
- new TimeWindowWithNegativePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, services));
- this.Context.Outgoing.OnWriting += (s, a) =>
- {
- if (a.ExtraControlParameter != null)
- this.Communicator.Write(a.Message, a.ExtraControlParameter);
- else
- this.Communicator.Write(a.Message);
- };
- }
- }
- }
|