using Edge.Core.Parser; using Edge.Core.Parser.BinaryParser.MessageEntity; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Timer = System.Timers.Timer; namespace Edge.Core.Processor { /// /// More like a ping pong game, only an incoming message, then trigger an outgoing write message. /// otherwise, the outgoing write message will be queued and wait. /// /// /// public class TimeWindowWithNegativePollingOutgoing : Outgoing where TMessage : MessageBase { private const int maxQueueLength = 50; private ConcurrentStack outgoingMessageQueue = new ConcurrentStack(); /// /// when a message incame, how much time would delay before a responding(to device) message will send out. /// default set to 100ms. /// public int SinkTime { get; set; } = 100; private Thread guranteeOnTimeRespondingThread; public enum DeliveryMode { /// /// only last msg send by Write(...) (which queued here) will guranteed to be send in next cycle /// OnlyLastWrite, /// /// all msg send by Write(...) (which queued here) will guranteed to be send in next cycles /// AllWrite } public DeliveryMode Mode { get; set; } = DeliveryMode.OnlyLastWrite; private ManualResetEvent blocker = new ManualResetEvent(false); /// /// Gets or sets the predict for default responding(outgoing to device) message. /// The message will be send if the time window elapsed. /// public Func DefaultOutgoingMsgProducer { get; set; } private TMessage lastWriteMsg; /// /// 0 for not allowed, 1 for allowed. /// private int allowWriteThrough = 0; public TimeWindowWithNegativePollingOutgoing(IIncoming incoming, IServiceProvider services) : base(incoming, services) { this.guranteeOnTimeRespondingThread = new Thread(() => { while (true) { try { if (this.blocker.WaitOne()) { Thread.Sleep(this.SinkTime); if (this.outgoingMessageQueue.TryPop(out object pendingForWrite)) { if (pendingForWrite is TMessage msg) { base.Write(msg); this.lastWriteMsg = msg; } else if (pendingForWrite is Tuple, Action, int> ___) { base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); this.lastWriteMsg = ___.Item1; } else if (pendingForWrite is Tuple writeWith2Parameter) { base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2); this.lastWriteMsg = writeWith2Parameter.Item1; } if (this.Mode == DeliveryMode.OnlyLastWrite) this.outgoingMessageQueue.Clear(); } else { var defaultMsg = this.DefaultOutgoingMsgProducer(incoming.Message, this.lastWriteMsg); base.Write(defaultMsg); } } } finally { this.blocker.Reset(); } }; }); this.guranteeOnTimeRespondingThread.Start(); incoming.OnMessageIncoming += (_, __) => { this.blocker.Set(); }; } #region Write functions /// /// actual write action will be performed on another thread. /// /// public override void Write(TMessage message) { this.outgoingMessageQueue.Push(message); if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached"); //InternalTryWrite(); } /// /// actual write action will be performed on another thread. /// /// /// /// /// public override void WriteAsync(TMessage request, Func responseCapture, Action callback, int timeout) { this.outgoingMessageQueue.Push(new Tuple, Action, int>(request, responseCapture, callback, timeout)); if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached"); //InternalTryWrite(); } /// /// actual write action will be performed on another thread. /// /// /// public override void Write(TMessage message, object extraControlParameter) { this.outgoingMessageQueue.Push(new Tuple(message, extraControlParameter)); if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached"); //InternalTryWrite(); } #endregion //private void InternalTryWrite() //{ // if (1 == Interlocked.CompareExchange(ref this.allowWriteThrough, 0, 1)) // { // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite)) // { // if (pendingForWrite is TMessage msg) // { // base.Write(msg); // } // else if (pendingForWrite is Tuple, Action, int> ___) // { // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); // } // else if (pendingForWrite is Tuple writeWith2Parameter) // { // base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2); // } // } // } //} } }