Outgoing.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. using Edge.Core.Parser;
  2. using Edge.Core.Parser.BinaryParser.MessageEntity;
  3. using Microsoft.Extensions.DependencyInjection;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Logging.Abstractions;
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Linq;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. namespace Edge.Core.Processor
  12. {
  13. public class Outgoing<TRaw, TMessage> : IOutgoing<TRaw, TMessage> where TMessage : MessageBase
  14. {
  15. //static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
  16. static ILogger logger = NullLogger.Instance;
  17. protected IIncoming<TMessage> incoming;
  18. public event EventHandler<OutgoingEventArg<TMessage>> OnWriting;
  19. //接收当前连接是否正常
  20. private TaskCompletionSource<bool> isConnectTask;
  21. public Outgoing(IIncoming<TMessage> incoming)
  22. {
  23. this.incoming = incoming;
  24. }
  25. public Outgoing(IIncoming<TMessage> incoming, IServiceProvider services)
  26. {
  27. this.incoming = incoming;
  28. if (services != null)
  29. {
  30. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  31. logger = loggerFactory.CreateLogger("Communicator");
  32. }
  33. }
  34. /// <summary>
  35. /// 通知当前链接已连接
  36. /// </summary>
  37. public void OnConnect()
  38. {
  39. isConnectTask = new TaskCompletionSource<bool>();
  40. }
  41. /// <summary>
  42. /// 通知当前链接已断开
  43. /// </summary>
  44. public void OnDisconnect()
  45. {
  46. isConnectTask.SetResult(false);
  47. }
  48. /// <summary>
  49. ///
  50. /// </summary>
  51. /// <param name="request">request to remote peer, which should trigger a response from remote peer.</param>
  52. /// <param name="responseCapture">predict of response correlated to request, first is the request, 2nd is the pending for capture response</param>
  53. /// <param name="callback">when response catpured, or timed, this callback will be called</param>
  54. /// <param name="timeout">time out, by ms</param>
  55. public virtual void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
  56. {
  57. int isCallbackCalled = 0;
  58. EventHandler eventHandler = null;
  59. eventHandler = (s, a) =>
  60. {
  61. var incoming = s as Incoming<TMessage>;
  62. if (responseCapture(request, incoming.Message))
  63. {
  64. this.incoming.OnMessageIncoming -= eventHandler;
  65. if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
  66. {
  67. this.incoming.DisablePropagate = true;
  68. try
  69. {
  70. callback(request, incoming.Message);
  71. }
  72. catch (Exception exx) { logger.LogError("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); }
  73. }
  74. }
  75. };
  76. this.incoming.OnMessageIncoming += eventHandler;
  77. var _ = new System.Timers.Timer(timeout);
  78. _.Elapsed += (__, ___) =>
  79. {
  80. _.Stop();
  81. this.incoming.OnMessageIncoming -= eventHandler;
  82. if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
  83. {
  84. callback(request, null);
  85. }
  86. };
  87. _.Start();
  88. var safe = this.OnWriting;
  89. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = request });
  90. }
  91. public virtual Task<TMessage> WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, int timeout)
  92. {
  93. var tcs = new TaskCompletionSource<TMessage>();
  94. this.WriteAsync(request, responseCapture, (oriRequest, response) =>
  95. {
  96. if (response != null)
  97. tcs.SetResult(response);
  98. else
  99. tcs.SetResult(null);
  100. }, timeout);
  101. return tcs.Task;
  102. }
  103. public virtual async Task<WriteRepsonse> WriteAsyncAndCheckIsConnect(TMessage request, Func<TMessage, TMessage, bool> responseCapture, int timeout)
  104. {
  105. if(isConnectTask == null) return new WriteRepsonse()
  106. {
  107. ResponseType = WriteResponseType.DISCONNECT,
  108. Data = false,
  109. };
  110. Task<TMessage> sendTask = WriteAsync(request, responseCapture, timeout);
  111. Task responseTask = await Task.WhenAny(sendTask, isConnectTask.Task);
  112. var type = WriteResponseType.TIME_OUT;
  113. object? result = null;
  114. if (responseTask == sendTask)
  115. {
  116. result = await sendTask;
  117. if (result != null) type = WriteResponseType.OK;
  118. }
  119. if(responseTask == isConnectTask.Task)
  120. {
  121. result = await isConnectTask.Task;
  122. type = WriteResponseType.DISCONNECT;
  123. }
  124. return new WriteRepsonse()
  125. {
  126. ResponseType = type,
  127. Data = result,
  128. };
  129. }
  130. public virtual void Write(TMessage message)
  131. {
  132. var safe = this.OnWriting;
  133. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message });
  134. }
  135. /// <summary>
  136. ///
  137. /// </summary>
  138. /// <param name="message"></param>
  139. /// <param name="redirectParameter"></param>
  140. public virtual void Write(TMessage message, object extraControlParameter)
  141. {
  142. var safe = this.OnWriting;
  143. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message, ExtraControlParameter = extraControlParameter });
  144. }
  145. }
  146. public class WriteRepsonse
  147. {
  148. public WriteResponseType ResponseType { get; set; }
  149. public object? Data { get; set; }
  150. }
  151. public enum WriteResponseType
  152. {
  153. TIME_OUT,
  154. DISCONNECT,
  155. OK
  156. }
  157. }