using Edge.Core.Parser; using Edge.Core.Parser.BinaryParser.MessageEntity; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Timer = System.Threading.Timer; using System.Collections.Concurrent; namespace Edge.Core.Processor { /// <summary> /// With a build in timer to timely send msg from queue, /// so every messages written will be firstly queued here and then delayed to send. /// NOTE, /// </summary> /// <typeparam name="TRaw"></typeparam> /// <typeparam name="TMessage"></typeparam> public class TimeWindowWithActivePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage>, IDisposable where TMessage : MessageBase { static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator"); //use dedicated thread for make sure the polling interval accurate private Thread pollingThread; private bool disposed = false; /// <summary> /// I've see a case that underlying communicator suddenly muted for seconds(maybe remote device got stuck inside??), /// and later, bunch of bytes flushed in and got deserialzed into Messages and all were piped into Handlers, /// which surely will trigger the handler frequently call the Write(...) for a bunch of responses, thus full fill this queue very quickly. /// </summary> private const int maxQueueLength = 200; //private int activelyPollingInterval = 600; private ConcurrentQueue<object> outgoingMessageQueue = new ConcurrentQueue<object>(); public Func<TMessage> PollingMsgProducer { get; set; } /// <summary> /// by ms, Gets the polling interval for internal polling timer. /// </summary> public int PollingInterval { get; } //public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval) : base(incoming) //{ // this.PollingInterval = activelyPollingInterval; // this.polling = new Timer((__) => // { // var result = this.polling.Change(Timeout.Infinite, Timeout.Infinite); // if (!result) // logger.Warn("Polling timer failed to Change to Infinite!!!"); // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite)) // { // if (pendingForWrite is TMessage) // { // base.Write(pendingForWrite as TMessage); // } // else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>) // { // var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>; // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); // } // else if (pendingForWrite is Tuple<TMessage, object>) // { // var ___ = pendingForWrite as Tuple<TMessage, object>; // base.Write(___.Item1, ___.Item2); // } // } // else // if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); } // result = this.polling.Change(activelyPollingInterval, activelyPollingInterval); // if (!result) // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!"); // }, null, 2000, activelyPollingInterval); //} public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval, IServiceProvider services) : base(incoming, services) { this.PollingInterval = activelyPollingInterval; this.pollingThread = new Thread(async () => { while (!this.disposed) { await Task.Delay(this.PollingInterval); if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite)) { if (pendingForWrite is TMessage) { base.Write(pendingForWrite as TMessage); } else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>) { var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>; base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); } else if (pendingForWrite is Tuple<TMessage, object>) { var ___ = pendingForWrite as Tuple<TMessage, object>; base.Write(___.Item1, ___.Item2); } } else if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); } } }); this.pollingThread.Start(); //result = this.polling.Change(activelyPollingInterval, activelyPollingInterval); //if (!result) // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!"); //}, null, 2000, activelyPollingInterval); } private void HandleQueueMaxLengthReachedError() { if (this.outgoingMessageQueue.Count > maxQueueLength / 3) { logger.Info("TimeWindowWithActivePollingOutgoing, queue reached its 1/3 capacity, just inform and will do nothing."); } else if (this.outgoingMessageQueue.Count > maxQueueLength) { /* let's dump inside elements to see what happened.*/ string logStr = ""; var msgLogStrings = this.outgoingMessageQueue.OfType<TMessage>().Select(s => s.ToLogString()); if (msgLogStrings.Any()) logStr += msgLogStrings.Aggregate((acc, n) => acc + "\r\n====next=======\r\n" + n); var tuple4Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>>().Select(s => s.Item1.ToLogString()); if (tuple4Strings.Any()) logStr += tuple4Strings.Aggregate((acc, n) => acc + "\r\n" + n); var tuple2Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, object>>().Select(s => s.Item1.ToLogString()); if (tuple2Strings.Any()) logStr += tuple2Strings.Aggregate((acc, n) => acc + "\r\n" + n); /*I have no better way to handle this case but clear 1/3 of(oldest items) the queue*/ for (int i = 0; i < maxQueueLength / 3; i++) { object _; this.outgoingMessageQueue.TryDequeue(out _); } //this.outgoingMessageQueue.Clear(); logger.Error("TimeWindowWithActivePollingOutgoing, Will clear 1/3 of the queue since Max message queue length: " + maxQueueLength + " reached, elements are: " + logStr); } } /// <summary> /// Write a msg to queue, and actual write action will be performed on another thread. /// </summary> /// <param name="message"></param> public override void Write(TMessage message) { this.outgoingMessageQueue.Enqueue(message); this.HandleQueueMaxLengthReachedError(); } /// <summary> /// actual write action will be performed on another thread. /// </summary> /// <param name="request"></param> /// <param name="responseCapture"></param> /// <param name="callback"></param> /// <param name="timeout"></param> public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout) { this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout)); this.HandleQueueMaxLengthReachedError(); } /// <summary> /// actual write action will be performed on another thread. /// </summary> /// <param name="message"></param> /// <param name="extraControlParameter"></param> public override void Write(TMessage message, object extraControlParameter) { this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, object>(message, extraControlParameter)); this.HandleQueueMaxLengthReachedError(); } public void Dispose() { this.disposed = true; } } }