123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- using Edge.Core.Parser;
- using Edge.Core.Parser.BinaryParser.MessageEntity;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using Timer = System.Threading.Timer;
- using System.Collections.Concurrent;
- namespace Edge.Core.Processor
- {
-
-
-
-
-
-
-
- public class TimeWindowWithActivePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage>, IDisposable where TMessage : MessageBase
- {
- static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
-
- private Thread pollingThread;
- private bool disposed = false;
-
-
-
-
-
- private const int maxQueueLength = 200;
-
- private ConcurrentQueue<object> outgoingMessageQueue = new ConcurrentQueue<object>();
- public Func<TMessage> PollingMsgProducer { get; set; }
-
-
-
- public int PollingInterval { get; }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval, IServiceProvider services) : base(incoming, services)
- {
- this.PollingInterval = activelyPollingInterval;
- this.pollingThread = new Thread(async () =>
- {
- while (!this.disposed)
- {
- await Task.Delay(this.PollingInterval);
- if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
- {
- if (pendingForWrite is TMessage)
- {
- base.Write(pendingForWrite as TMessage);
- }
- else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>)
- {
- var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>;
- base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
- }
- else if (pendingForWrite is Tuple<TMessage, object>)
- {
- var ___ = pendingForWrite as Tuple<TMessage, object>;
- base.Write(___.Item1, ___.Item2);
- }
- }
- else
- if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
- }
- });
- this.pollingThread.Start();
-
-
-
-
- }
- private void HandleQueueMaxLengthReachedError()
- {
- if (this.outgoingMessageQueue.Count > maxQueueLength / 3)
- {
- logger.Info("TimeWindowWithActivePollingOutgoing, queue reached its 1/3 capacity, just inform and will do nothing.");
- }
- else if (this.outgoingMessageQueue.Count > maxQueueLength)
- {
-
- string logStr = "";
- var msgLogStrings = this.outgoingMessageQueue.OfType<TMessage>().Select(s => s.ToLogString());
- if (msgLogStrings.Any()) logStr += msgLogStrings.Aggregate((acc, n) => acc + "\r\n====next=======\r\n" + n);
- var tuple4Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>>().Select(s => s.Item1.ToLogString());
- if (tuple4Strings.Any()) logStr += tuple4Strings.Aggregate((acc, n) => acc + "\r\n" + n);
- var tuple2Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, object>>().Select(s => s.Item1.ToLogString());
- if (tuple2Strings.Any()) logStr += tuple2Strings.Aggregate((acc, n) => acc + "\r\n" + n);
-
- for (int i = 0; i < maxQueueLength / 3; i++)
- {
- object _;
- this.outgoingMessageQueue.TryDequeue(out _);
- }
-
- logger.Error("TimeWindowWithActivePollingOutgoing, Will clear 1/3 of the queue since Max message queue length: " + maxQueueLength
- + " reached, elements are: " + logStr);
- }
- }
-
-
-
-
- public override void Write(TMessage message)
- {
- this.outgoingMessageQueue.Enqueue(message);
- this.HandleQueueMaxLengthReachedError();
- }
-
-
-
-
-
-
-
- public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
- {
- this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
- this.HandleQueueMaxLengthReachedError();
- }
-
-
-
-
-
- public override void Write(TMessage message, object extraControlParameter)
- {
- this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, object>(message, extraControlParameter));
- this.HandleQueueMaxLengthReachedError();
- }
- public void Dispose()
- {
- this.disposed = true;
- }
- }
- }
|