using Edge.Core.Parser; using Edge.Core.Parser.BinaryParser.MessageEntity; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Timer = System.Timers.Timer; namespace Edge.Core.Processor { /// <summary> /// More like a ping pong game, only an incoming message, then trigger an outgoing write message. /// otherwise, the outgoing write message will be queued and wait. /// </summary> /// <typeparam name="TRaw"></typeparam> /// <typeparam name="TMessage"></typeparam> public class TimeWindowWithNegativePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage> where TMessage : MessageBase { private const int maxQueueLength = 50; private ConcurrentStack<object> outgoingMessageQueue = new ConcurrentStack<object>(); /// <summary> /// when a message incame, how much time would delay before a responding(to device) message will send out. /// default set to 100ms. /// </summary> public int SinkTime { get; set; } = 100; private Thread guranteeOnTimeRespondingThread; public enum DeliveryMode { /// <summary> /// only last msg send by Write(...) (which queued here) will guranteed to be send in next cycle /// </summary> OnlyLastWrite, /// <summary> /// all msg send by Write(...) (which queued here) will guranteed to be send in next cycles /// </summary> AllWrite } public DeliveryMode Mode { get; set; } = DeliveryMode.OnlyLastWrite; private ManualResetEvent blocker = new ManualResetEvent(false); /// <summary> /// Gets or sets the predict for default responding(outgoing to device) message. /// The message will be send if the time window elapsed. /// </summary> public Func<TMessage, TMessage, TMessage> DefaultOutgoingMsgProducer { get; set; } private TMessage lastWriteMsg; /// <summary> /// 0 for not allowed, 1 for allowed. /// </summary> private int allowWriteThrough = 0; public TimeWindowWithNegativePollingOutgoing(IIncoming<TMessage> incoming, IServiceProvider services) : base(incoming, services) { this.guranteeOnTimeRespondingThread = new Thread(() => { while (true) { try { if (this.blocker.WaitOne()) { Thread.Sleep(this.SinkTime); if (this.outgoingMessageQueue.TryPop(out object pendingForWrite)) { if (pendingForWrite is TMessage msg) { base.Write(msg); this.lastWriteMsg = msg; } else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___) { base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); this.lastWriteMsg = ___.Item1; } else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter) { base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2); this.lastWriteMsg = writeWith2Parameter.Item1; } if (this.Mode == DeliveryMode.OnlyLastWrite) this.outgoingMessageQueue.Clear(); } else { var defaultMsg = this.DefaultOutgoingMsgProducer(incoming.Message, this.lastWriteMsg); base.Write(defaultMsg); } } } finally { this.blocker.Reset(); } }; }); this.guranteeOnTimeRespondingThread.Start(); incoming.OnMessageIncoming += (_, __) => { this.blocker.Set(); }; } #region Write functions /// <summary> /// actual write action will be performed on another thread. /// </summary> /// <param name="message"></param> public override void Write(TMessage message) { this.outgoingMessageQueue.Push(message); if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached"); //InternalTryWrite(); } /// <summary> /// actual write action will be performed on another thread. /// </summary> /// <param name="request"></param> /// <param name="responseCapture"></param> /// <param name="callback"></param> /// <param name="timeout"></param> public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout) { this.outgoingMessageQueue.Push(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout)); if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached"); //InternalTryWrite(); } /// <summary> /// actual write action will be performed on another thread. /// </summary> /// <param name="message"></param> /// <param name="extraControlParameter"></param> public override void Write(TMessage message, object extraControlParameter) { this.outgoingMessageQueue.Push(new Tuple<TMessage, object>(message, extraControlParameter)); if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached"); //InternalTryWrite(); } #endregion //private void InternalTryWrite() //{ // if (1 == Interlocked.CompareExchange(ref this.allowWriteThrough, 0, 1)) // { // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite)) // { // if (pendingForWrite is TMessage msg) // { // base.Write(msg); // } // else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___) // { // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4); // } // else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter) // { // base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2); // } // } // } //} } }