123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- using Edge.Core.Parser;
- using Edge.Core.Parser.BinaryParser.MessageEntity;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using Timer = System.Timers.Timer;
- namespace Edge.Core.Processor
- {
- /// <summary>
- /// More like a ping pong game, only an incoming message, then trigger an outgoing write message.
- /// otherwise, the outgoing write message will be queued and wait.
- /// </summary>
- /// <typeparam name="TRaw"></typeparam>
- /// <typeparam name="TMessage"></typeparam>
- public class TimeWindowWithNegativePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage> where TMessage : MessageBase
- {
- private const int maxQueueLength = 50;
- private ConcurrentStack<object> outgoingMessageQueue = new ConcurrentStack<object>();
- /// <summary>
- /// when a message incame, how much time would delay before a responding(to device) message will send out.
- /// default set to 100ms.
- /// </summary>
- public int SinkTime { get; set; } = 100;
- private Thread guranteeOnTimeRespondingThread;
- public enum DeliveryMode
- {
- /// <summary>
- /// only last msg send by Write(...) (which queued here) will guranteed to be send in next cycle
- /// </summary>
- OnlyLastWrite,
- /// <summary>
- /// all msg send by Write(...) (which queued here) will guranteed to be send in next cycles
- /// </summary>
- AllWrite
- }
- public DeliveryMode Mode { get; set; } = DeliveryMode.OnlyLastWrite;
- private ManualResetEvent blocker = new ManualResetEvent(false);
- /// <summary>
- /// Gets or sets the predict for default responding(outgoing to device) message.
- /// The message will be send if the time window elapsed.
- /// </summary>
- public Func<TMessage, TMessage, TMessage> DefaultOutgoingMsgProducer { get; set; }
- private TMessage lastWriteMsg;
- /// <summary>
- /// 0 for not allowed, 1 for allowed.
- /// </summary>
- private int allowWriteThrough = 0;
- public TimeWindowWithNegativePollingOutgoing(IIncoming<TMessage> incoming, IServiceProvider services) : base(incoming, services)
- {
- this.guranteeOnTimeRespondingThread = new Thread(() =>
- {
- while (true)
- {
- try
- {
- if (this.blocker.WaitOne())
- {
- Thread.Sleep(this.SinkTime);
- if (this.outgoingMessageQueue.TryPop(out object pendingForWrite))
- {
- if (pendingForWrite is TMessage msg)
- {
- base.Write(msg);
- this.lastWriteMsg = msg;
- }
- else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___)
- {
- base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
- this.lastWriteMsg = ___.Item1;
- }
- else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter)
- {
- base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
- this.lastWriteMsg = writeWith2Parameter.Item1;
- }
- if (this.Mode == DeliveryMode.OnlyLastWrite)
- this.outgoingMessageQueue.Clear();
- }
- else
- {
- var defaultMsg = this.DefaultOutgoingMsgProducer(incoming.Message, this.lastWriteMsg);
- base.Write(defaultMsg);
- }
- }
- }
- finally
- {
- this.blocker.Reset();
- }
- };
- });
- this.guranteeOnTimeRespondingThread.Start();
- incoming.OnMessageIncoming += (_, __) =>
- {
- this.blocker.Set();
- };
- }
- #region Write functions
- /// <summary>
- /// actual write action will be performed on another thread.
- /// </summary>
- /// <param name="message"></param>
- public override void Write(TMessage message)
- {
- this.outgoingMessageQueue.Push(message);
- if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
- //InternalTryWrite();
- }
- /// <summary>
- /// actual write action will be performed on another thread.
- /// </summary>
- /// <param name="request"></param>
- /// <param name="responseCapture"></param>
- /// <param name="callback"></param>
- /// <param name="timeout"></param>
- public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
- {
- this.outgoingMessageQueue.Push(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
- if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
- //InternalTryWrite();
- }
- /// <summary>
- /// actual write action will be performed on another thread.
- /// </summary>
- /// <param name="message"></param>
- /// <param name="extraControlParameter"></param>
- public override void Write(TMessage message, object extraControlParameter)
- {
- this.outgoingMessageQueue.Push(new Tuple<TMessage, object>(message, extraControlParameter));
- if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
- //InternalTryWrite();
- }
- #endregion
- //private void InternalTryWrite()
- //{
- // if (1 == Interlocked.CompareExchange(ref this.allowWriteThrough, 0, 1))
- // {
- // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
- // {
- // if (pendingForWrite is TMessage msg)
- // {
- // base.Write(msg);
- // }
- // else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___)
- // {
- // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
- // }
- // else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter)
- // {
- // base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
- // }
- // }
- // }
- //}
- }
- }
|