TcpClientCommunicator.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. using Edge.Core.Parser.BinaryParser.MessageEntity;
  2. using Edge.Core.Parser.BinaryParser.Util;
  3. using Edge.Core.Processor.Dispatcher.Attributes;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using Microsoft.Extensions.Logging;
  6. using Microsoft.Extensions.Logging.Abstractions;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Net.Sockets;
  11. using System.Runtime.InteropServices;
  12. using System.Text;
  13. using System.Threading;
  14. using System.Threading.Tasks;
  15. namespace Edge.Core.Processor.Communicator
  16. {
  17. [MetaPartsDescriptor(
  18. "lang-zh-cn:Tcp(本机为客户端)通讯器lang-en-us:Tcp(as client) communicator",
  19. "lang-zh-cn:基于TCP/IP技术的通讯器, FC作为客户端主动连接服务器端lang-en-us:TCP/IP based communicator, FC as the client side and connecting to Server side")]
  20. public class TcpClientCommunicator<T> : ICommunicator<byte[], T> where T : MessageTemplateBase
  21. {
  22. //protected static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
  23. protected ILogger logger = NullLogger.Instance;
  24. public static int tcpReceiveBufferSize = 1500;
  25. private Edge.Core.Parser.BinaryParser.ParserBase parser;
  26. private string remoteTcpServerListeningIpAddress;
  27. private int remoteTcpServerListeningPortNumber;
  28. protected TcpClient tcpClient;
  29. /// <summary>
  30. /// 0 is for false, 1 is for true.
  31. /// </summary>
  32. private int isInTcpConnecting = 0;
  33. private object syncObject = new object();
  34. private object syncObject_Feed = new object();
  35. private bool enableTcpKeepAlive = true;
  36. /// <summary>
  37. /// 0 for not started, 1 for started.
  38. /// </summary>
  39. private int isStarted = 0;
  40. protected bool disposed = false;
  41. private IMessageCutter<byte[]> messageCutter;
  42. //private List<MessageTemplateBase> continueParseStack = new List<MessageTemplateBase>();
  43. public event EventHandler OnConnected;
  44. public event EventHandler OnDisconnected;
  45. public event EventHandler<CommunicatorErrorMessageReadEventArg> OnErrorMessageRead;
  46. public event EventHandler<CommunicatorEventArg<byte[], T>> OnRawDataWriting;
  47. public event EventHandler<CommunicatorEventArg<byte[], T>> OnDataReceived;
  48. /// <summary>
  49. /// fired once raw data read from underlying tcp connection, without message cutting yet.
  50. /// </summary>
  51. protected Action<byte[]> OnRawDataReceived;
  52. public string Identity { get; set; }
  53. [ParamsJsonSchemas("TcpClientCommunicatorCtorSchema")]
  54. public TcpClientCommunicator(IMessageCutter<byte[]> binaryMsgCutter,
  55. Edge.Core.Parser.BinaryParser.ParserBase parser,
  56. string remoteTcpServerListeningIpAddress,
  57. int remoteTcpServerTcpListeningPortNumber,
  58. string customLoggerFileName,
  59. IServiceProvider services)
  60. {
  61. this.Identity = remoteTcpServerListeningIpAddress + ":" + remoteTcpServerTcpListeningPortNumber;
  62. if (services != null)
  63. {
  64. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  65. this.logger = loggerFactory.CreateLogger("Communicator");
  66. if (!string.IsNullOrEmpty(customLoggerFileName))
  67. if (customLoggerFileName == "*")
  68. this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + remoteTcpServerListeningIpAddress + "_" + remoteTcpServerTcpListeningPortNumber);
  69. else
  70. this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + customLoggerFileName);
  71. }
  72. this.messageCutter = binaryMsgCutter;
  73. this.parser = parser;
  74. this.remoteTcpServerListeningIpAddress = remoteTcpServerListeningIpAddress;
  75. this.remoteTcpServerListeningPortNumber = remoteTcpServerTcpListeningPortNumber;
  76. if (this.messageCutter != null)
  77. {
  78. this.messageCutter.OnInvalidMessageRead += (____, ______) =>
  79. {
  80. var loggingStr = $"Message from {this.remoteTcpServerListeningIpAddress}:{this.remoteTcpServerListeningPortNumber} Read Invalid data, detail: {(______?.Message ?? "")}";
  81. this.logger.LogInformation(loggingStr);
  82. this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(null, loggingStr));
  83. };
  84. this.messageCutter.OnMessageCut += (s, _) =>
  85. {
  86. var eventArg = new CommunicatorEventArg<byte[], T>();
  87. try
  88. {
  89. eventArg.Data = this.messageCutter.Message;
  90. eventArg.Message = this.parser.Deserialize(this.messageCutter.Message.ToArray()) as T;
  91. if (logger.IsEnabled(LogLevel.Debug))
  92. this.logger.LogDebug(" Parsed: " + eventArg.Message.ToLogString());
  93. }
  94. catch (Exception ex)
  95. {
  96. var loggingStr = "Message from " + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber
  97. + " exceptioned in deserilaizing bytes:\r\n 0x" + this.messageCutter.Message.ToHexLogString() + "\r\n exception detail:\r\n" + ex;
  98. this.logger.LogError(loggingStr);
  99. this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(this.messageCutter.Message, loggingStr));
  100. return;
  101. }
  102. try
  103. {
  104. this.OnDataReceived?.Invoke(this, eventArg);
  105. }
  106. catch (Exception ex)
  107. {
  108. this.logger.LogError(this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber
  109. + " exceptioned in handle message:\r\n" + eventArg.Message.ToLogString() + "\r\n exceptioned detail: \r\n" + ex);
  110. }
  111. };
  112. }
  113. }
  114. public void Dispose()
  115. {
  116. this.tcpClient?.Close();
  117. this.disposed = true;
  118. }
  119. public virtual Task<bool> Start()
  120. {
  121. if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0))
  122. {
  123. try
  124. {
  125. this.SetupTcpClientConnectionInBackground();
  126. return Task.FromResult(true);
  127. }
  128. catch (Exception exxx)
  129. {
  130. return Task.FromResult(false);
  131. }
  132. }
  133. return Task.FromResult(false);
  134. }
  135. public bool Write(T message)
  136. {
  137. if (message == null) return false;
  138. byte[] rawData;
  139. try
  140. {
  141. //rawData = this.parser.Serialize(message);
  142. rawData = message.ToCommonByteArray();
  143. var safe = this.OnRawDataWriting;
  144. var arg = new CommunicatorEventArg<byte[], T>() { Data = rawData, Message = message, Continue = true };
  145. safe?.Invoke(this, arg);
  146. if (this.tcpClient == null || !arg.Continue) { this.logger.LogError("Write failed, this.tcpClient is null? " + (this.tcpClient is null)); return false; }
  147. }
  148. catch (Exception exx)
  149. {
  150. var msgLogStr = "";
  151. try
  152. {
  153. msgLogStr = message.ToLogString();
  154. }
  155. catch
  156. {
  157. msgLogStr = "exceptioned for get ToLogString()";
  158. }
  159. this.logger.LogError("Tcp Write failed in serialize or event raise for msg: " + message.GetType() + " -> " + msgLogStr + "\r\n detail: " + exx);
  160. return false;
  161. }
  162. lock (this.syncObject)
  163. {
  164. try
  165. {
  166. if (this.isInTcpConnecting == 1) return false;
  167. if (logger.IsEnabled(LogLevel.Debug))
  168. this.logger.LogDebug("TCP to " + this.tcpClient.Client.RemoteEndPoint + " Outgoing--->: " + message.ToLogString() + "\r\n 0x" + rawData.ToHexLogString());
  169. this.tcpClient.Client.Send(rawData);
  170. }
  171. catch (Exception exx)
  172. {
  173. this.logger.LogError("Send tcp msg to "
  174. + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber + " Write(...) exceptioned, will re-build connection, detail: " + exx);
  175. var _ = this.OnDisconnected;
  176. _?.Invoke(this, null);
  177. this.SetupTcpClientConnectionInBackground();
  178. }
  179. }
  180. return true;
  181. }
  182. public bool Write(T message, object extraControlParameter)
  183. {
  184. throw new NotImplementedException();
  185. }
  186. /// <summary>
  187. /// connect to remote pump, FC as the client side.
  188. /// </summary>
  189. /// <returns></returns>
  190. private void SetupTcpClientConnectionInBackground()
  191. {
  192. if (0 == Interlocked.CompareExchange(ref this.isInTcpConnecting, 1, 0))
  193. ThreadPool.QueueUserWorkItem(o =>
  194. {
  195. while (!this.disposed)
  196. {
  197. try
  198. {
  199. this.logger.LogInformation("FCC as the tcp client side is establishing to remote tcp server "
  200. + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber);
  201. if (this.tcpClient != null) this.tcpClient.Dispose();
  202. this.tcpClient = new TcpClient(this.remoteTcpServerListeningIpAddress, this.remoteTcpServerListeningPortNumber);
  203. if (this.enableTcpKeepAlive)
  204. {
  205. if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
  206. {
  207. }
  208. else
  209. {
  210. //overall switch to enable the feature.
  211. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
  212. //when entering tcp idle, how much time wait before send a KeepAlive package.
  213. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 20);
  214. //when in tcp idle, and failed to received previous KeepAlive response, sequence of KeepAlive packages will send by this interval, until response received, or TcpKeepAliveRetryCount reached.
  215. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 8);
  216. //when in tcp idle, how many KeepAlive package response have not received will treat as tcp broken, and trigger tcp disconnected exception.
  217. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3);
  218. }
  219. }
  220. logger.LogInformation("FCC has established to remote tcp server("
  221. + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber
  222. + "), local port: " + this.tcpClient.Client.LocalEndPoint.ToString());
  223. byte[] buffer = new byte[tcpReceiveBufferSize];
  224. this.tcpClient.GetStream().BeginRead(buffer, 0,
  225. tcpReceiveBufferSize,
  226. this.Tcp_DataReceived, new System.Tuple<TcpClient, byte[]>(this.tcpClient, buffer));
  227. this.isInTcpConnecting = 0;
  228. var _ = this.OnConnected;
  229. _?.Invoke(this, null);
  230. return;
  231. }
  232. catch (Exception exxx)
  233. {
  234. this.logger.LogError("Setup tcp client which connect to remote tcp server("
  235. + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber
  236. + ") exceptioned: " + exxx + "\r\n Will keep retrying...");
  237. Thread.Sleep(3000);
  238. }
  239. }
  240. });
  241. }
  242. private void Tcp_DataReceived(IAsyncResult ar)
  243. {
  244. System.Tuple<TcpClient, byte[]> parameter = null;
  245. byte[] dataRead = null;
  246. int bytesReadCount = 0;
  247. try
  248. {
  249. parameter = (System.Tuple<TcpClient, byte[]>)(ar.AsyncState);
  250. dataRead = parameter.Item2;
  251. bytesReadCount = parameter.Item1.Client.EndReceive(ar);
  252. if (bytesReadCount == 0)
  253. throw new Exception("tcp client received 0 count data which indicates the connection is broken, trigger disconnection");
  254. }
  255. catch (Exception tcpExp)
  256. {
  257. this.logger.LogInformation("Tcp client EndReceive from " + this.remoteTcpServerListeningIpAddress + " exceptioned: " + tcpExp + "\r\n will re-build connection...");
  258. var _ = this.OnDisconnected;
  259. _?.Invoke(this, null);
  260. this.SetupTcpClientConnectionInBackground();
  261. return;
  262. }
  263. try
  264. {
  265. if (logger.IsEnabled(LogLevel.Debug))
  266. this.logger.LogDebug("TCP from " + parameter.Item1.Client.RemoteEndPoint.ToString() + " <---Incoming: 0x" + dataRead.Take(bytesReadCount).ToHexLogString());
  267. try
  268. {
  269. var data = dataRead.Take(bytesReadCount).ToArray();
  270. this.OnRawDataReceived?.Invoke(data);
  271. lock (this.syncObject_Feed)
  272. {
  273. this.messageCutter.Feed(data);
  274. }
  275. }
  276. catch (Exception ex)
  277. {
  278. this.logger.LogError(" Exception in TCP Parsing bytes: 0x" + dataRead.Take(bytesReadCount).ToHexLogString() + ", detail: \r\n" + ex.ToString());
  279. }
  280. byte[] buffer = new byte[tcpReceiveBufferSize];
  281. parameter.Item1.GetStream().BeginRead(buffer, 0,
  282. tcpReceiveBufferSize,
  283. this.Tcp_DataReceived, new System.Tuple<TcpClient, byte[]>(parameter.Item1, buffer));
  284. }
  285. catch (Exception ex)
  286. {
  287. this.logger.LogError(" Exception in Tcp_DataReceived(...):\r\n" + ex.ToString() + "\r\n will rebuild the connection until succeed.");
  288. var _ = this.OnDisconnected;
  289. _?.Invoke(this, null);
  290. this.SetupTcpClientConnectionInBackground();
  291. return;
  292. }
  293. }
  294. }
  295. }