123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- using Edge.Core.Parser;
- using Edge.Core.Parser.BinaryParser.MessageEntity;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using Timer = System.Threading.Timer;
- using System.Collections.Concurrent;
- namespace Edge.Core.Processor
- {
- /// <summary>
- /// With a build in timer to timely send msg from queue,
- /// so every messages written will be firstly queued here and then delayed to send.
- /// NOTE,
- /// </summary>
- /// <typeparam name="TRaw"></typeparam>
- /// <typeparam name="TMessage"></typeparam>
- public class TimeWindowWithActivePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage>, IDisposable where TMessage : MessageBase
- {
- static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
- //use dedicated thread for make sure the polling interval accurate
- private Thread pollingThread;
- private bool disposed = false;
- /// <summary>
- /// I've see a case that underlying communicator suddenly muted for seconds(maybe remote device got stuck inside??),
- /// and later, bunch of bytes flushed in and got deserialzed into Messages and all were piped into Handlers,
- /// which surely will trigger the handler frequently call the Write(...) for a bunch of responses, thus full fill this queue very quickly.
- /// </summary>
- private const int maxQueueLength = 200;
- //private int activelyPollingInterval = 600;
- private ConcurrentQueue<object> outgoingMessageQueue = new ConcurrentQueue<object>();
- public Func<TMessage> PollingMsgProducer { get; set; }
- /// <summary>
- /// by ms, Gets the polling interval for internal polling timer.
- /// </summary>
- public int PollingInterval { get; }
- //public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval) : base(incoming)
- //{
- // this.PollingInterval = activelyPollingInterval;
- // this.polling = new Timer((__) =>
- // {
- // var result = this.polling.Change(Timeout.Infinite, Timeout.Infinite);
- // if (!result)
- // logger.Warn("Polling timer failed to Change to Infinite!!!");
- // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
- // {
- // if (pendingForWrite is TMessage)
- // {
- // base.Write(pendingForWrite as TMessage);
- // }
- // else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>)
- // {
- // var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>;
- // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
- // }
- // else if (pendingForWrite is Tuple<TMessage, object>)
- // {
- // var ___ = pendingForWrite as Tuple<TMessage, object>;
- // base.Write(___.Item1, ___.Item2);
- // }
- // }
- // else
- // if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
- // result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
- // if (!result)
- // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
- // }, null, 2000, activelyPollingInterval);
- //}
- public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval, IServiceProvider services) : base(incoming, services)
- {
- this.PollingInterval = activelyPollingInterval;
- this.pollingThread = new Thread(async () =>
- {
- while (!this.disposed)
- {
- await Task.Delay(this.PollingInterval);
- if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
- {
- if (pendingForWrite is TMessage)
- {
- base.Write(pendingForWrite as TMessage);
- }
- else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>)
- {
- var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>;
- base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
- }
- else if (pendingForWrite is Tuple<TMessage, object>)
- {
- var ___ = pendingForWrite as Tuple<TMessage, object>;
- base.Write(___.Item1, ___.Item2);
- }
- }
- else
- if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
- }
- });
- this.pollingThread.Start();
- //result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
- //if (!result)
- // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
- //}, null, 2000, activelyPollingInterval);
- }
- private void HandleQueueMaxLengthReachedError()
- {
- if (this.outgoingMessageQueue.Count > maxQueueLength / 3)
- {
- logger.Info("TimeWindowWithActivePollingOutgoing, queue reached its 1/3 capacity, just inform and will do nothing.");
- }
- else if (this.outgoingMessageQueue.Count > maxQueueLength)
- {
- /* let's dump inside elements to see what happened.*/
- string logStr = "";
- var msgLogStrings = this.outgoingMessageQueue.OfType<TMessage>().Select(s => s.ToLogString());
- if (msgLogStrings.Any()) logStr += msgLogStrings.Aggregate((acc, n) => acc + "\r\n====next=======\r\n" + n);
- var tuple4Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>>().Select(s => s.Item1.ToLogString());
- if (tuple4Strings.Any()) logStr += tuple4Strings.Aggregate((acc, n) => acc + "\r\n" + n);
- var tuple2Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, object>>().Select(s => s.Item1.ToLogString());
- if (tuple2Strings.Any()) logStr += tuple2Strings.Aggregate((acc, n) => acc + "\r\n" + n);
- /*I have no better way to handle this case but clear 1/3 of(oldest items) the queue*/
- for (int i = 0; i < maxQueueLength / 3; i++)
- {
- object _;
- this.outgoingMessageQueue.TryDequeue(out _);
- }
- //this.outgoingMessageQueue.Clear();
- logger.Error("TimeWindowWithActivePollingOutgoing, Will clear 1/3 of the queue since Max message queue length: " + maxQueueLength
- + " reached, elements are: " + logStr);
- }
- }
- /// <summary>
- /// Write a msg to queue, and actual write action will be performed on another thread.
- /// </summary>
- /// <param name="message"></param>
- public override void Write(TMessage message)
- {
- this.outgoingMessageQueue.Enqueue(message);
- this.HandleQueueMaxLengthReachedError();
- }
- /// <summary>
- /// actual write action will be performed on another thread.
- /// </summary>
- /// <param name="request"></param>
- /// <param name="responseCapture"></param>
- /// <param name="callback"></param>
- /// <param name="timeout"></param>
- public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
- {
- this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
- this.HandleQueueMaxLengthReachedError();
- }
- /// <summary>
- /// actual write action will be performed on another thread.
- /// </summary>
- /// <param name="message"></param>
- /// <param name="extraControlParameter"></param>
- public override void Write(TMessage message, object extraControlParameter)
- {
- this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, object>(message, extraControlParameter));
- this.HandleQueueMaxLengthReachedError();
- }
- public void Dispose()
- {
- this.disposed = true;
- }
- }
- }
|