Outgoing.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. using Edge.Core.Parser;
  2. using Edge.Core.Parser.BinaryParser.MessageEntity;
  3. using Microsoft.Extensions.DependencyInjection;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Logging.Abstractions;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. namespace Edge.Core.Processor
  12. {
  13. public class Outgoing<TRaw, TMessage> : IOutgoing<TRaw, TMessage> where TMessage : MessageBase
  14. {
  15. //static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
  16. static ILogger logger = NullLogger.Instance;
  17. protected IIncoming<TMessage> incoming;
  18. public event EventHandler<OutgoingEventArg<TMessage>> OnWriting;
  19. public Outgoing(IIncoming<TMessage> incoming)
  20. {
  21. this.incoming = incoming;
  22. }
  23. public Outgoing(IIncoming<TMessage> incoming, IServiceProvider services)
  24. {
  25. this.incoming = incoming;
  26. if (services != null)
  27. {
  28. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  29. logger = loggerFactory.CreateLogger("Communicator");
  30. }
  31. }
  32. /// <summary>
  33. ///
  34. /// </summary>
  35. /// <param name="request">request to remote peer, which should trigger a response from remote peer.</param>
  36. /// <param name="responseCapture">predict of response correlated to request, first is the request, 2nd is the pending for capture response</param>
  37. /// <param name="callback">when response catpured, or timed, this callback will be called</param>
  38. /// <param name="timeout">time out, by ms</param>
  39. public virtual void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
  40. {
  41. int isCallbackCalled = 0;
  42. EventHandler eventHandler = null;
  43. eventHandler = (s, a) =>
  44. {
  45. var incoming = s as Incoming<TMessage>;
  46. if (responseCapture(request, incoming.Message))
  47. {
  48. this.incoming.OnMessageIncoming -= eventHandler;
  49. if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
  50. {
  51. this.incoming.DisablePropagate = true;
  52. try
  53. {
  54. callback(request, incoming.Message);
  55. }
  56. catch (Exception exx) { logger.LogError("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); }
  57. }
  58. }
  59. };
  60. this.incoming.OnMessageIncoming += eventHandler;
  61. var _ = new System.Timers.Timer(timeout);
  62. _.Elapsed += (__, ___) =>
  63. {
  64. _.Stop();
  65. this.incoming.OnMessageIncoming -= eventHandler;
  66. if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
  67. {
  68. callback(request, null);
  69. }
  70. };
  71. _.Start();
  72. var safe = this.OnWriting;
  73. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = request });
  74. }
  75. public virtual Task<TMessage> WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, int timeout)
  76. {
  77. var tcs = new TaskCompletionSource<TMessage>();
  78. this.WriteAsync(request, responseCapture, (oriRequest, response) =>
  79. {
  80. if (response != null)
  81. tcs.SetResult(response);
  82. else
  83. tcs.SetResult(null);
  84. }, timeout);
  85. return tcs.Task;
  86. }
  87. public virtual void Write(TMessage message)
  88. {
  89. var safe = this.OnWriting;
  90. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message });
  91. }
  92. /// <summary>
  93. ///
  94. /// </summary>
  95. /// <param name="message"></param>
  96. /// <param name="redirectParameter"></param>
  97. public virtual void Write(TMessage message, object extraControlParameter)
  98. {
  99. var safe = this.OnWriting;
  100. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message, ExtraControlParameter = extraControlParameter });
  101. }
  102. }
  103. }