Outgoing.cs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. using Edge.Core.Parser;
  2. using Edge.Core.Parser.BinaryParser.MessageEntity;
  3. using Microsoft.Extensions.DependencyInjection;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Logging.Abstractions;
  6. using Newtonsoft.Json;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Threading;
  11. using System.Threading.Tasks;
  12. namespace Edge.Core.Processor
  13. {
  14. public class Outgoing<TRaw, TMessage> : IOutgoing<TRaw, TMessage> where TMessage : MessageBase
  15. {
  16. static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Outgoing");
  17. //static ILogger logger = NullLogger.Instance;
  18. protected IIncoming<TMessage> incoming;
  19. public event EventHandler<OutgoingEventArg<TMessage>> OnWriting;
  20. //接收当前连接是否正常
  21. private TaskCompletionSource<bool> isConnectTask;
  22. public Outgoing(IIncoming<TMessage> incoming)
  23. {
  24. this.incoming = incoming;
  25. }
  26. public Outgoing(IIncoming<TMessage> incoming, IServiceProvider services)
  27. {
  28. this.incoming = incoming;
  29. if (services != null)
  30. {
  31. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  32. //logger = loggerFactory.CreateLogger("Communicator");
  33. }
  34. }
  35. /// <summary>
  36. /// 通知当前链接已连接
  37. /// </summary>
  38. public void OnConnect()
  39. {
  40. logger.Info("当前已连接");
  41. isConnectTask = new TaskCompletionSource<bool>();
  42. }
  43. /// <summary>
  44. /// 通知当前链接已断开
  45. /// </summary>
  46. public void OnDisconnect()
  47. {
  48. logger.Info("当前已断开连接");
  49. //isConnectTask.SetResult(false);
  50. }
  51. /// <summary>
  52. ///
  53. /// </summary>
  54. /// <param name="request">request to remote peer, which should trigger a response from remote peer.</param>
  55. /// <param name="responseCapture">predict of response correlated to request, first is the request, 2nd is the pending for capture response</param>
  56. /// <param name="callback">when response catpured, or timed, this callback will be called</param>
  57. /// <param name="timeout">time out, by ms</param>
  58. public virtual void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
  59. {
  60. int isCallbackCalled = 0;
  61. EventHandler eventHandler = null;
  62. eventHandler = (s, a) =>
  63. {
  64. var incoming = s as Incoming<TMessage>;
  65. if (responseCapture(request, incoming.Message))
  66. {
  67. this.incoming.OnMessageIncoming -= eventHandler;
  68. if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
  69. {
  70. this.incoming.DisablePropagate = true;
  71. try
  72. {
  73. callback(request, incoming.Message);
  74. }
  75. //catch (Exception exx) { logger.LogError("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); }
  76. catch (Exception exx) { logger.Error("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); }
  77. }
  78. }
  79. };
  80. this.incoming.OnMessageIncoming += eventHandler;
  81. var _ = new System.Timers.Timer(timeout);
  82. _.Elapsed += (__, ___) =>
  83. {
  84. _.Stop();
  85. this.incoming.OnMessageIncoming -= eventHandler;
  86. if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
  87. {
  88. callback(request, null);
  89. }
  90. };
  91. _.Start();
  92. var safe = this.OnWriting;
  93. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = request });
  94. }
  95. public virtual Task<TMessage> WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, int timeout)
  96. {
  97. var tcs = new TaskCompletionSource<TMessage>();
  98. this.WriteAsync(request, responseCapture, (oriRequest, response) =>
  99. {
  100. if (response != null)
  101. tcs.SetResult(response);
  102. else
  103. tcs.SetResult(null);
  104. }, timeout);
  105. return tcs.Task;
  106. }
  107. public virtual async Task<WriteRepsonse> WriteAsyncAndCheckIsConnect(TMessage request, Func<TMessage, TMessage, bool> responseCapture, int timeout)
  108. {
  109. if (isConnectTask == null)
  110. {
  111. return new WriteRepsonse()
  112. {
  113. ResponseType = WriteResponseType.DISCONNECT,
  114. Data = false,
  115. };
  116. }
  117. Task<TMessage> sendTask = WriteAsync(request, responseCapture, timeout);
  118. Task responseTask = await Task.WhenAny(sendTask, isConnectTask.Task);
  119. var type = WriteResponseType.TIME_OUT;
  120. object? result = null;
  121. if (responseTask == sendTask)
  122. {
  123. result = await sendTask;
  124. if (result != null) type = WriteResponseType.OK;
  125. }
  126. if(responseTask == isConnectTask.Task)
  127. {
  128. result = await isConnectTask.Task;
  129. type = WriteResponseType.DISCONNECT;
  130. }
  131. return new WriteRepsonse()
  132. {
  133. ResponseType = type,
  134. Data = result,
  135. };
  136. }
  137. public virtual void Write(TMessage message)
  138. {
  139. var safe = this.OnWriting;
  140. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message });
  141. }
  142. /// <summary>
  143. ///
  144. /// </summary>
  145. /// <param name="message"></param>
  146. /// <param name="redirectParameter"></param>
  147. public virtual void Write(TMessage message, object extraControlParameter)
  148. {
  149. var safe = this.OnWriting;
  150. safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message, ExtraControlParameter = extraControlParameter });
  151. }
  152. }
  153. public class WriteRepsonse
  154. {
  155. public WriteResponseType ResponseType { get; set; }
  156. public object? Data { get; set; }
  157. }
  158. public enum WriteResponseType
  159. {
  160. TIME_OUT,
  161. DISCONNECT,
  162. OK
  163. }
  164. }