123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- using Edge.Core.Parser;
- using Edge.Core.Parser.BinaryParser.MessageEntity;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using Timer = System.Timers.Timer;
- namespace Edge.Core.Processor
- {
-
-
-
-
-
-
- public class TimeWindowWithNegativePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage> where TMessage : MessageBase
- {
- private const int maxQueueLength = 50;
- private ConcurrentStack<object> outgoingMessageQueue = new ConcurrentStack<object>();
-
-
-
-
- public int SinkTime { get; set; } = 100;
- private Thread guranteeOnTimeRespondingThread;
- public enum DeliveryMode
- {
-
-
-
- OnlyLastWrite,
-
-
-
- AllWrite
- }
- public DeliveryMode Mode { get; set; } = DeliveryMode.OnlyLastWrite;
- private ManualResetEvent blocker = new ManualResetEvent(false);
-
-
-
-
- public Func<TMessage, TMessage, TMessage> DefaultOutgoingMsgProducer { get; set; }
- private TMessage lastWriteMsg;
-
-
-
- private int allowWriteThrough = 0;
- public TimeWindowWithNegativePollingOutgoing(IIncoming<TMessage> incoming, IServiceProvider services) : base(incoming, services)
- {
- this.guranteeOnTimeRespondingThread = new Thread(() =>
- {
- while (true)
- {
- try
- {
- if (this.blocker.WaitOne())
- {
- Thread.Sleep(this.SinkTime);
- if (this.outgoingMessageQueue.TryPop(out object pendingForWrite))
- {
- if (pendingForWrite is TMessage msg)
- {
- base.Write(msg);
- this.lastWriteMsg = msg;
- }
- else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___)
- {
- base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
- this.lastWriteMsg = ___.Item1;
- }
- else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter)
- {
- base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
- this.lastWriteMsg = writeWith2Parameter.Item1;
- }
- if (this.Mode == DeliveryMode.OnlyLastWrite)
- this.outgoingMessageQueue.Clear();
- }
- else
- {
- var defaultMsg = this.DefaultOutgoingMsgProducer(incoming.Message, this.lastWriteMsg);
- base.Write(defaultMsg);
- }
- }
- }
- finally
- {
- this.blocker.Reset();
- }
- };
- });
- this.guranteeOnTimeRespondingThread.Start();
- incoming.OnMessageIncoming += (_, __) =>
- {
- this.blocker.Set();
- };
- }
- #region Write functions
-
-
-
-
- public override void Write(TMessage message)
- {
- this.outgoingMessageQueue.Push(message);
- if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
-
- }
-
-
-
-
-
-
-
- public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
- {
- this.outgoingMessageQueue.Push(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
- if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
-
- }
-
-
-
-
-
- public override void Write(TMessage message, object extraControlParameter)
- {
- this.outgoingMessageQueue.Push(new Tuple<TMessage, object>(message, extraControlParameter));
- if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
-
- }
- #endregion
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- }
- }
|