TimeWindowWithActivePollingOutgoing.cs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. using Edge.Core.Parser;
  2. using Edge.Core.Parser.BinaryParser.MessageEntity;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Timer = System.Threading.Timer;
  9. using System.Collections.Concurrent;
  10. namespace Edge.Core.Processor
  11. {
  12. /// <summary>
  13. /// With a build in timer to timely send msg from queue,
  14. /// so every messages written will be firstly queued here and then delayed to send.
  15. /// NOTE,
  16. /// </summary>
  17. /// <typeparam name="TRaw"></typeparam>
  18. /// <typeparam name="TMessage"></typeparam>
  19. public class TimeWindowWithActivePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage>, IDisposable where TMessage : MessageBase
  20. {
  21. static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
  22. //use dedicated thread for make sure the polling interval accurate
  23. private Thread pollingThread;
  24. private bool disposed = false;
  25. /// <summary>
  26. /// I've see a case that underlying communicator suddenly muted for seconds(maybe remote device got stuck inside??),
  27. /// and later, bunch of bytes flushed in and got deserialzed into Messages and all were piped into Handlers,
  28. /// which surely will trigger the handler frequently call the Write(...) for a bunch of responses, thus full fill this queue very quickly.
  29. /// </summary>
  30. private const int maxQueueLength = 200;
  31. //private int activelyPollingInterval = 600;
  32. private ConcurrentQueue<object> outgoingMessageQueue = new ConcurrentQueue<object>();
  33. public Func<TMessage> PollingMsgProducer { get; set; }
  34. /// <summary>
  35. /// by ms, Gets the polling interval for internal polling timer.
  36. /// </summary>
  37. public int PollingInterval { get; }
  38. //public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval) : base(incoming)
  39. //{
  40. // this.PollingInterval = activelyPollingInterval;
  41. // this.polling = new Timer((__) =>
  42. // {
  43. // var result = this.polling.Change(Timeout.Infinite, Timeout.Infinite);
  44. // if (!result)
  45. // logger.Warn("Polling timer failed to Change to Infinite!!!");
  46. // if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
  47. // {
  48. // if (pendingForWrite is TMessage)
  49. // {
  50. // base.Write(pendingForWrite as TMessage);
  51. // }
  52. // else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>)
  53. // {
  54. // var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>;
  55. // base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
  56. // }
  57. // else if (pendingForWrite is Tuple<TMessage, object>)
  58. // {
  59. // var ___ = pendingForWrite as Tuple<TMessage, object>;
  60. // base.Write(___.Item1, ___.Item2);
  61. // }
  62. // }
  63. // else
  64. // if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
  65. // result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
  66. // if (!result)
  67. // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
  68. // }, null, 2000, activelyPollingInterval);
  69. //}
  70. public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval, IServiceProvider services) : base(incoming, services)
  71. {
  72. this.PollingInterval = activelyPollingInterval;
  73. this.pollingThread = new Thread(async () =>
  74. {
  75. while (!this.disposed)
  76. {
  77. await Task.Delay(this.PollingInterval);
  78. if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
  79. {
  80. if (pendingForWrite is TMessage)
  81. {
  82. base.Write(pendingForWrite as TMessage);
  83. }
  84. else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>)
  85. {
  86. var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>;
  87. base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
  88. }
  89. else if (pendingForWrite is Tuple<TMessage, object>)
  90. {
  91. var ___ = pendingForWrite as Tuple<TMessage, object>;
  92. base.Write(___.Item1, ___.Item2);
  93. }
  94. }
  95. else
  96. if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
  97. }
  98. });
  99. this.pollingThread.Start();
  100. //result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
  101. //if (!result)
  102. // logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
  103. //}, null, 2000, activelyPollingInterval);
  104. }
  105. private void HandleQueueMaxLengthReachedError()
  106. {
  107. if (this.outgoingMessageQueue.Count > maxQueueLength / 3)
  108. {
  109. logger.Info("TimeWindowWithActivePollingOutgoing, queue reached its 1/3 capacity, just inform and will do nothing.");
  110. }
  111. else if (this.outgoingMessageQueue.Count > maxQueueLength)
  112. {
  113. /* let's dump inside elements to see what happened.*/
  114. string logStr = "";
  115. var msgLogStrings = this.outgoingMessageQueue.OfType<TMessage>().Select(s => s.ToLogString());
  116. if (msgLogStrings.Any()) logStr += msgLogStrings.Aggregate((acc, n) => acc + "\r\n====next=======\r\n" + n);
  117. var tuple4Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>>().Select(s => s.Item1.ToLogString());
  118. if (tuple4Strings.Any()) logStr += tuple4Strings.Aggregate((acc, n) => acc + "\r\n" + n);
  119. var tuple2Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, object>>().Select(s => s.Item1.ToLogString());
  120. if (tuple2Strings.Any()) logStr += tuple2Strings.Aggregate((acc, n) => acc + "\r\n" + n);
  121. /*I have no better way to handle this case but clear 1/3 of(oldest items) the queue*/
  122. for (int i = 0; i < maxQueueLength / 3; i++)
  123. {
  124. object _;
  125. this.outgoingMessageQueue.TryDequeue(out _);
  126. }
  127. //this.outgoingMessageQueue.Clear();
  128. logger.Error("TimeWindowWithActivePollingOutgoing, Will clear 1/3 of the queue since Max message queue length: " + maxQueueLength
  129. + " reached, elements are: " + logStr);
  130. }
  131. }
  132. /// <summary>
  133. /// Write a msg to queue, and actual write action will be performed on another thread.
  134. /// </summary>
  135. /// <param name="message"></param>
  136. public override void Write(TMessage message)
  137. {
  138. this.outgoingMessageQueue.Enqueue(message);
  139. this.HandleQueueMaxLengthReachedError();
  140. }
  141. /// <summary>
  142. /// actual write action will be performed on another thread.
  143. /// </summary>
  144. /// <param name="request"></param>
  145. /// <param name="responseCapture"></param>
  146. /// <param name="callback"></param>
  147. /// <param name="timeout"></param>
  148. public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
  149. {
  150. this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
  151. this.HandleQueueMaxLengthReachedError();
  152. }
  153. /// <summary>
  154. /// actual write action will be performed on another thread.
  155. /// </summary>
  156. /// <param name="message"></param>
  157. /// <param name="extraControlParameter"></param>
  158. public override void Write(TMessage message, object extraControlParameter)
  159. {
  160. this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, object>(message, extraControlParameter));
  161. this.HandleQueueMaxLengthReachedError();
  162. }
  163. public void Dispose()
  164. {
  165. this.disposed = true;
  166. }
  167. }
  168. }