TimeWindowWithNegativePollingOutgoing.cs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. using Edge.Core.Parser;
  2. using Edge.Core.Parser.BinaryParser.MessageEntity;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using Timer = System.Timers.Timer;
  10. namespace Edge.Core.Processor
  11. {
  12. /// <summary>
  13. /// More like a ping pong game, only an incoming message, then trigger an outgoing write message.
  14. /// otherwise, the outgoing write message will be queued and wait.
  15. /// </summary>
  16. /// <typeparam name="TRaw"></typeparam>
  17. /// <typeparam name="TMessage"></typeparam>
  18. public class TimeWindowWithNegativePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage> where TMessage : MessageBase
  19. {
  20. private const int maxQueueLength = 50;
  21. private ConcurrentStack<object> outgoingMessageQueue = new ConcurrentStack<object>();
  22. /// <summary>
  23. /// when a message incame, how much time would delay before a responding(to device) message will send out.
  24. /// default set to 100ms.
  25. /// </summary>
  26. public int SinkTime { get; set; } = 100;
  27. private Thread guranteeOnTimeRespondingThread;
  28. public enum DeliveryMode
  29. {
  30. /// <summary>
  31. /// only last msg send by Write(...) (which queued here) will guranteed to be send in next cycle
  32. /// </summary>
  33. OnlyLastWrite,
  34. /// <summary>
  35. /// all msg send by Write(...) (which queued here) will guranteed to be send in next cycles
  36. /// </summary>
  37. AllWrite
  38. }
  39. public DeliveryMode Mode { get; set; } = DeliveryMode.OnlyLastWrite;
  40. private ManualResetEvent blocker = new ManualResetEvent(false);
  41. /// <summary>
  42. /// Gets or sets the predict for default responding(outgoing to device) message.
  43. /// The message will be send if the time window elapsed.
  44. /// </summary>
  45. public Func<TMessage, TMessage, TMessage> DefaultOutgoingMsgProducer { get; set; }
  46. private TMessage lastWriteMsg;
  47. /// <summary>
  48. /// 0 for not allowed, 1 for allowed.
  49. /// </summary>
  50. private int allowWriteThrough = 0;
  51. public TimeWindowWithNegativePollingOutgoing(IIncoming<TMessage> incoming, IServiceProvider services) : base(incoming, services)
  52. {
  53. this.guranteeOnTimeRespondingThread = new Thread(() =>
  54. {
  55. while (true)
  56. {
  57. try
  58. {
  59. if (this.blocker.WaitOne())
  60. {
  61. Thread.Sleep(this.SinkTime);
  62. if (this.outgoingMessageQueue.TryPop(out object pendingForWrite))
  63. {
  64. if (pendingForWrite is TMessage msg)
  65. {
  66. base.Write(msg);
  67. this.lastWriteMsg = msg;
  68. }
  69. else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___)
  70. {
  71. base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
  72. this.lastWriteMsg = ___.Item1;
  73. }
  74. else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter)
  75. {
  76. base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
  77. this.lastWriteMsg = writeWith2Parameter.Item1;
  78. }
  79. if (this.Mode == DeliveryMode.OnlyLastWrite)
  80. this.outgoingMessageQueue.Clear();
  81. }
  82. else
  83. {
  84. var defaultMsg = this.DefaultOutgoingMsgProducer(incoming.Message, this.lastWriteMsg);
  85. base.Write(defaultMsg);
  86. }
  87. }
  88. }
  89. finally
  90. {
  91. this.blocker.Reset();
  92. }
  93. };
  94. });
  95. this.guranteeOnTimeRespondingThread.Start();
  96. incoming.OnMessageIncoming += (_, __) =>
  97. {
  98. this.blocker.Set();
  99. };
  100. }
  101. #region Write functions
  102. /// <summary>
  103. /// actual write action will be performed on another thread.
  104. /// </summary>
  105. /// <param name="message"></param>
  106. public override void Write(TMessage message)
  107. {
  108. this.outgoingMessageQueue.Push(message);
  109. if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
  110. //InternalTryWrite();
  111. }
  112. /// <summary>
  113. /// actual write action will be performed on another thread.
  114. /// </summary>
  115. /// <param name="request"></param>
  116. /// <param name="responseCapture"></param>
  117. /// <param name="callback"></param>
  118. /// <param name="timeout"></param>
  119. public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
  120. {
  121. this.outgoingMessageQueue.Push(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
  122. if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
  123. //InternalTryWrite();
  124. }
  125. /// <summary>
  126. /// actual write action will be performed on another thread.
  127. /// </summary>
  128. /// <param name="message"></param>
  129. /// <param name="extraControlParameter"></param>
  130. public override void Write(TMessage message, object extraControlParameter)
  131. {
  132. this.outgoingMessageQueue.Push(new Tuple<TMessage, object>(message, extraControlParameter));
  133. if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
  134. //InternalTryWrite();
  135. }
  136. #endregion
  137. //private void InternalTryWrite()
  138. //{
  139. // if (1 == Interlocked.CompareExchange(ref this.allowWriteThrough, 0, 1))
  140. // {
  141. // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
  142. // {
  143. // if (pendingForWrite is TMessage msg)
  144. // {
  145. // base.Write(msg);
  146. // }
  147. // else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___)
  148. // {
  149. // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
  150. // }
  151. // else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter)
  152. // {
  153. // base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
  154. // }
  155. // }
  156. // }
  157. //}
  158. }
  159. }