using Edge.Core.Parser; using Edge.Core.Parser.BinaryParser.MessageEntity; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Timer = System.Threading.Timer; using System.Collections.Concurrent; namespace Edge.Core.Processor { /// /// With a build in timer to timely send msg from queue, /// so every messages written will be firstly queued here and then delayed to send. /// NOTE, /// /// /// public class TimeWindowWithActivePollingOutgoing : Outgoing, IDisposable where TMessage : MessageBase { static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator"); //use dedicated thread for make sure the polling interval accurate private Thread pollingThread; private bool disposed = false; /// /// I've see a case that underlying communicator suddenly muted for seconds(maybe remote device got stuck inside??), /// and later, bunch of bytes flushed in and got deserialzed into Messages and all were piped into Handlers, /// which surely will trigger the handler frequently call the Write(...) for a bunch of responses, thus full fill this queue very quickly. /// private const int maxQueueLength = 200; //private int activelyPollingInterval = 600; private ConcurrentQueue outgoingMessageQueue = new ConcurrentQueue(); public Func PollingMsgProducer { get; set; } /// /// by ms, Gets the polling interval for internal polling timer. /// public int PollingInterval { get; } //public TimeWindowWithActivePollingOutgoing(IIncoming incoming, int activelyPollingInterval) : base(incoming) //{ // this.PollingInterval = activelyPollingInterval; // this.polling = new Timer((__) => // { // var result = this.polling.Change(Timeout.Infinite, Timeout.Infinite); // if (!result) // logger.Warn("Polling timer failed to Change to Infinite!!!"); // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite)) // { // if (pendingForWrite is TMessage) // { // base.Write(pendingForWrite as TMessage); // } // else if (pendingForWrite is Tuple, Action, int>) // { // var ___ = pendingForWrite as Tuple, Action, int>; // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); // } // else if (pendingForWrite is Tuple) // { // var ___ = pendingForWrite as Tuple; // base.Write(___.Item1, ___.Item2); // } // } // else // if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); } // result = this.polling.Change(activelyPollingInterval, activelyPollingInterval); // if (!result) // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!"); // }, null, 2000, activelyPollingInterval); //} public TimeWindowWithActivePollingOutgoing(IIncoming incoming, int activelyPollingInterval, IServiceProvider services) : base(incoming, services) { this.PollingInterval = activelyPollingInterval; this.pollingThread = new Thread(async () => { while (!this.disposed) { await Task.Delay(this.PollingInterval); if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite)) { if (pendingForWrite is TMessage) { base.Write(pendingForWrite as TMessage); } else if (pendingForWrite is Tuple, Action, int>) { var ___ = pendingForWrite as Tuple, Action, int>; base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); } else if (pendingForWrite is Tuple) { var ___ = pendingForWrite as Tuple; base.Write(___.Item1, ___.Item2); } } else if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); } } }); this.pollingThread.Start(); //result = this.polling.Change(activelyPollingInterval, activelyPollingInterval); //if (!result) // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!"); //}, null, 2000, activelyPollingInterval); } private void HandleQueueMaxLengthReachedError() { if (this.outgoingMessageQueue.Count > maxQueueLength / 3) { logger.Info("TimeWindowWithActivePollingOutgoing, queue reached its 1/3 capacity, just inform and will do nothing."); } else if (this.outgoingMessageQueue.Count > maxQueueLength) { /* let's dump inside elements to see what happened.*/ string logStr = ""; var msgLogStrings = this.outgoingMessageQueue.OfType().Select(s => s.ToLogString()); if (msgLogStrings.Any()) logStr += msgLogStrings.Aggregate((acc, n) => acc + "\r\n====next=======\r\n" + n); var tuple4Strings = this.outgoingMessageQueue.OfType, Action, int>>().Select(s => s.Item1.ToLogString()); if (tuple4Strings.Any()) logStr += tuple4Strings.Aggregate((acc, n) => acc + "\r\n" + n); var tuple2Strings = this.outgoingMessageQueue.OfType>().Select(s => s.Item1.ToLogString()); if (tuple2Strings.Any()) logStr += tuple2Strings.Aggregate((acc, n) => acc + "\r\n" + n); /*I have no better way to handle this case but clear 1/3 of(oldest items) the queue*/ for (int i = 0; i < maxQueueLength / 3; i++) { object _; this.outgoingMessageQueue.TryDequeue(out _); } //this.outgoingMessageQueue.Clear(); logger.Error("TimeWindowWithActivePollingOutgoing, Will clear 1/3 of the queue since Max message queue length: " + maxQueueLength + " reached, elements are: " + logStr); } } /// /// Write a msg to queue, and actual write action will be performed on another thread. /// /// public override void Write(TMessage message) { this.outgoingMessageQueue.Enqueue(message); this.HandleQueueMaxLengthReachedError(); } /// /// actual write action will be performed on another thread. /// /// /// /// /// public override void WriteAsync(TMessage request, Func responseCapture, Action callback, int timeout) { this.outgoingMessageQueue.Enqueue(new Tuple, Action, int>(request, responseCapture, callback, timeout)); this.HandleQueueMaxLengthReachedError(); } /// /// actual write action will be performed on another thread. /// /// /// public override void Write(TMessage message, object extraControlParameter) { this.outgoingMessageQueue.Enqueue(new Tuple(message, extraControlParameter)); this.HandleQueueMaxLengthReachedError(); } public void Dispose() { this.disposed = true; } } }