TcpServerCommunicator.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. using Edge.Core.Parser.BinaryParser.MessageEntity;
  2. using Edge.Core.Parser.BinaryParser.Util;
  3. using Edge.Core.Processor.Dispatcher.Attributes;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using Microsoft.Extensions.Logging;
  6. using Microsoft.Extensions.Logging.Abstractions;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Net;
  11. using System.Net.Sockets;
  12. using System.Runtime.InteropServices;
  13. using System.Text;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace Edge.Core.Processor.Communicator
  17. {
  18. [MetaPartsDescriptor(
  19. "lang-zh-cn:Tcp(本程序为服务器端)通讯器lang-en-us:Tcp(as server) communicator",
  20. "lang-zh-cn:基于TCP/IP技术的通讯器, FC作为服务器端等待客户端连接lang-en-us:TCP/IP based communicator, FC as the server and wait for connections")]
  21. public class TcpServerCommunicator<T> : ICommunicator<byte[], T> where T : MessageTemplateBase
  22. {
  23. private CancellationTokenSource readAsyncCancellationTokenSource;
  24. private DateTime? lastReceiveMsgDataFromTcpClientDateTime;
  25. private System.Timers.Timer clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer;
  26. private static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("TcpServerCommunicator");
  27. //protected ILogger logger = NullLogger.Instance;
  28. public static int tcpReceiveBufferSize = 1500;
  29. private Edge.Core.Parser.BinaryParser.ParserBase parser;
  30. private int localTcpServerListeningPort;
  31. protected TcpListener tcpListener;
  32. public TcpClient exclusiveTcpClient { get; set; }
  33. private string exclusiveTcpClient_ClientRemoteEndPoint_Str = "?:?";
  34. //private bool isTcpConnBrokenDuringWrite = false;
  35. private object syncObject = new object();
  36. private object syncObject_Feed = new object();
  37. /// <summary>
  38. /// 0 for not started, 1 for started.
  39. /// </summary>
  40. private int isStarted = 0;
  41. protected bool disposed = false;
  42. private IMessageCutter<byte[]> messageCutter;
  43. //private List<MessageTemplateBase> continueParseStack = new List<MessageTemplateBase>();
  44. public event EventHandler OnConnected;
  45. public event EventHandler OnDisconnected;
  46. public event EventHandler<CommunicatorErrorMessageReadEventArg> OnErrorMessageRead;
  47. public event EventHandler<CommunicatorEventArg<byte[], T>> OnRawDataWriting;
  48. public event EventHandler<CommunicatorEventArg<byte[], T>> OnDataReceived;
  49. /// <summary>
  50. /// fired once raw data read from underlying tcp connection, without message cutting yet.
  51. /// </summary>
  52. protected Action<byte[]> OnRawDataReceived;
  53. public string Identity { get; set; }
  54. [ParamsJsonSchemas("TcpServerCommunicatorCtorSchema")]
  55. public TcpServerCommunicator(IMessageCutter<byte[]> binaryMsgCutter,
  56. Edge.Core.Parser.BinaryParser.ParserBase parser,
  57. int localTcpServerTcpListeningPortNumber,
  58. int enableClientSideActiveDetection,
  59. string enableClientSideIdentityRegistration,
  60. string enableClientSideConnLevelHeartbeat,
  61. string customLoggerFileName,
  62. IServiceProvider services)
  63. {
  64. this.Identity = "*:" + localTcpServerTcpListeningPortNumber;
  65. //if (services != null)
  66. //{
  67. // var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  68. // this.logger = loggerFactory.CreateLogger("Communicator");
  69. // if (!string.IsNullOrEmpty(customLoggerFileName))
  70. // if (customLoggerFileName == "*")
  71. // this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + localTcpServerTcpListeningPortNumber);
  72. // else
  73. // this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + customLoggerFileName);
  74. //}
  75. this.messageCutter = binaryMsgCutter;
  76. this.parser = parser;
  77. //this.remoteTcpServerListeningIpAddress = localTcpServerListeningIpAddress;
  78. this.localTcpServerListeningPort = localTcpServerTcpListeningPortNumber;
  79. if (this.messageCutter != null)
  80. {
  81. this.messageCutter.OnInvalidMessageRead += (____, ______) =>
  82. {
  83. var loggingStr = $"Bytes msg from tcp client: {exclusiveTcpClient_ClientRemoteEndPoint_Str} Read Invalid data, detail: {(______?.Message ?? "")}";
  84. //this.logger.LogInformation(loggingStr);
  85. logger.Info(loggingStr);
  86. this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(null, loggingStr));
  87. };
  88. this.messageCutter.OnMessageCut += (s, _) =>
  89. {
  90. this.lastReceiveMsgDataFromTcpClientDateTime = DateTime.Now;
  91. var eventArg = new CommunicatorEventArg<byte[], T>();
  92. try
  93. {
  94. eventArg.Data = this.messageCutter.Message;
  95. eventArg.Message = this.parser.Deserialize(this.messageCutter.Message.ToArray()) as T;
  96. //if (logger.IsEnabled(LogLevel.Debug))
  97. //this.logger.LogDebug(" Parsed: " + eventArg.Message.ToLogString());
  98. logger.Debug(" Parsed: " + eventArg.Message.ToLogString());
  99. }
  100. catch (Exception ex)
  101. {
  102. var loggingStr = "Message from " + exclusiveTcpClient_ClientRemoteEndPoint_Str
  103. + " exceptioned in deserilaizing bytes:\r\n 0x" + this.messageCutter.Message.ToHexLogString() + "\r\n exception detail:\r\n" + ex;
  104. //this.logger.LogError(loggingStr);
  105. logger.Error(loggingStr);
  106. this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(this.messageCutter.Message, loggingStr));
  107. return;
  108. }
  109. try
  110. {
  111. this.OnDataReceived?.Invoke(this, eventArg);
  112. }
  113. catch (Exception ex)
  114. {
  115. //this.logger.LogError("Message from " + exclusiveTcpClient_ClientRemoteEndPoint_Str
  116. // + " exceptioned in handle message:\r\n" + eventArg.Message.ToLogString() + "\r\n exceptioned detail: \r\n" + ex);
  117. logger.Error("Message from " + exclusiveTcpClient_ClientRemoteEndPoint_Str
  118. + " exceptioned in handle message:\r\n" + eventArg.Message.ToLogString() + "\r\n exceptioned detail: \r\n" + ex);
  119. }
  120. };
  121. }
  122. if (enableClientSideActiveDetection > 0)
  123. {
  124. this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer = new System.Timers.Timer();
  125. this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Interval = 2000;
  126. this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Elapsed += (s, a) =>
  127. {
  128. //null indicates this communicator just started
  129. if (this.lastReceiveMsgDataFromTcpClientDateTime == null) return;
  130. if (DateTime.Now.Subtract(this.lastReceiveMsgDataFromTcpClientDateTime ?? DateTime.MinValue).TotalSeconds >= enableClientSideActiveDetection
  131. && this.exclusiveTcpClient != null)
  132. {
  133. //this.logger.LogInformation($"Long time no see data from tcp client with: { this.exclusiveTcpClient_ClientRemoteEndPoint_Str }, will actively disconnect it...");
  134. logger.Info($"Long time no see data from tcp client with: { this.exclusiveTcpClient_ClientRemoteEndPoint_Str }, will actively disconnect it...");
  135. this.readAsyncCancellationTokenSource?.Cancel();
  136. }
  137. };
  138. this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Start();
  139. }
  140. }
  141. public void Dispose()
  142. {
  143. try
  144. {
  145. this.readAsyncCancellationTokenSource?.Cancel();
  146. }
  147. catch { }
  148. this.isStarted = 0;
  149. this.tcpListener?.Stop();
  150. this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer?.Stop();
  151. this.disposed = true;
  152. }
  153. public virtual async Task<bool> Stop()
  154. {
  155. this.Dispose();
  156. return true;
  157. }
  158. public virtual Task<bool> Start()
  159. {
  160. if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0))
  161. {
  162. try
  163. {
  164. this.tcpListener = new TcpListener(IPAddress.Any, this.localTcpServerListeningPort);
  165. this.tcpListener.Start();
  166. //this.logger.LogInformation($"TcpListener listened on localPort: {this.localTcpServerListeningPort}");
  167. logger.Info($"TcpListener listened on localPort: {this.localTcpServerListeningPort}");
  168. var _ = Task.Run(async () =>
  169. {
  170. while (this.isStarted == 1)
  171. {
  172. //logger.LogInformation($"Waitting for connection on localPort: {this.localTcpServerListeningPort}");
  173. logger.Info($"Waitting for connection on localPort: {this.localTcpServerListeningPort}");
  174. var newTcpClient = await this.tcpListener.AcceptTcpClientAsync();
  175. //this.logger.LogInformation($" A tcp client with remote ip/port: {newTcpClient.Client.RemoteEndPoint} has connected in");
  176. logger.Info($" A tcp client with remote ip/port: {newTcpClient.Client.RemoteEndPoint} has connected in");
  177. if (this.exclusiveTcpClient != null)
  178. {
  179. //logger.LogInformation($" There's already a previous TcpClient established as exclusive, so close this new one with remote ip/port: {newTcpClient.Client.RemoteEndPoint}");
  180. logger.Info($" There's already a previous TcpClient established as exclusive, so close this new one with remote ip/port: {newTcpClient.Client.RemoteEndPoint}");
  181. try
  182. {
  183. newTcpClient.Close();
  184. }
  185. catch { }
  186. continue;
  187. }
  188. this.exclusiveTcpClient = newTcpClient;
  189. this.exclusiveTcpClient_ClientRemoteEndPoint_Str = this.exclusiveTcpClient.Client.RemoteEndPoint.ToString();
  190. //this.logger.LogInformation($" Tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} has been chosen as exclusive");
  191. logger.Info($" Tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} has been chosen as exclusive");
  192. var ___ = Task.Run(async () =>
  193. {
  194. #region try set the tcp client with TCP keepalive feature, but seems does not work.
  195. if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
  196. {
  197. }
  198. else
  199. {
  200. //logger.LogInformation($"Enabled tcp keep alive for tcpClient");
  201. //overall switch to enable the feature.
  202. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
  203. //when entering tcp idle, how much time wait before send a KeepAlive package.
  204. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 20);
  205. //when in tcp idle, and failed to received previous KeepAlive response, sequence of KeepAlive packages will send by this interval, until response received, or TcpKeepAliveRetryCount reached.
  206. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 8);
  207. //when in tcp idle, how many KeepAlive package response have not received will treat as tcp broken, and trigger tcp disconnected exception.
  208. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3);
  209. }
  210. #endregion
  211. this.lastReceiveMsgDataFromTcpClientDateTime = DateTime.Now;
  212. this.OnConnected?.Invoke(this, null);
  213. while (this.isStarted == 1)
  214. {
  215. this.readAsyncCancellationTokenSource = new CancellationTokenSource();
  216. byte[] buffer = new byte[tcpReceiveBufferSize];
  217. int bytesReadCount;
  218. try
  219. {
  220. bytesReadCount = await this.exclusiveTcpClient.GetStream().ReadAsync(buffer, this.readAsyncCancellationTokenSource.Token);
  221. if (bytesReadCount == 0)
  222. throw new Exception("tcp server received 0 count data indicates the connection is broken");
  223. //if (logger.IsEnabled(LogLevel.Debug))
  224. // this.logger.LogDebug($"TCP from { this.exclusiveTcpClient_ClientRemoteEndPoint_Str } <---Incoming: 0x" + buffer.Take(bytesReadCount).ToHexLogString());
  225. logger.Debug($"TCP from {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} <---Incoming: 0x" + buffer.Take(bytesReadCount).ToHexLogString());
  226. }
  227. catch (Exception eeee)
  228. {
  229. //logger.LogError($"tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} exceptioned in GetStream().ReadAsync(), treat as tcp disconnection, detail: {eeee}");
  230. logger.Error($"tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} exceptioned in GetStream().ReadAsync(), treat as tcp disconnection, detail: {eeee}");
  231. try
  232. {
  233. this.exclusiveTcpClient.Close();
  234. }
  235. finally { this.exclusiveTcpClient = null; }
  236. try
  237. {
  238. this.OnDisconnected?.Invoke(this, null);
  239. }
  240. catch { }
  241. break;
  242. }
  243. var data = buffer.Take(bytesReadCount).ToArray();
  244. try
  245. {
  246. this.OnRawDataReceived?.Invoke(data);
  247. }
  248. catch { }
  249. try
  250. {
  251. lock (this.syncObject_Feed)
  252. {
  253. this.messageCutter.Feed(data);
  254. }
  255. }
  256. catch (Exception ex)
  257. {
  258. //this.logger.LogError($"Exception in Parsing msg bytes: 0x{ buffer.Take(bytesReadCount).ToHexLogString()}, detail: {Environment.NewLine}{ ex.ToString()}");
  259. logger.Error($"Exception in Parsing msg bytes: 0x{ buffer.Take(bytesReadCount).ToHexLogString()}, detail: {Environment.NewLine}{ ex.ToString()}");
  260. }
  261. }
  262. });
  263. }
  264. });
  265. return Task.FromResult(true);
  266. }
  267. catch (Exception exxx)
  268. {
  269. //logger.LogError($"Start tcp listener on port: {this.localTcpServerListeningPort} exceptioned: {exxx}");
  270. logger.Error($"Start tcp listener on port: {this.localTcpServerListeningPort} exceptioned: {exxx}");
  271. return Task.FromResult(false);
  272. }
  273. }
  274. return Task.FromResult(false);
  275. }
  276. public bool Write(T message)
  277. {
  278. if (this.exclusiveTcpClient == null)
  279. {
  280. //if (this.logger.IsEnabled(LogLevel.Trace))
  281. // this.logger.LogTrace($"Write failed as no tcp client connected in yet");
  282. return false;
  283. }
  284. if (message == null) return false;
  285. byte[] rawData;
  286. try
  287. {
  288. rawData = message.ToCommonByteArray();
  289. //rawData = this.parser.Serialize(message);
  290. var arg = new CommunicatorEventArg<byte[], T>() { Data = rawData, Message = message, Continue = true };
  291. this.OnRawDataWriting?.Invoke(this, arg);
  292. //if (this.exclusiveTcpClient == null || !arg.Continue) { this.logger.LogError("Write failed, this.tcpClient is null: " + (this.exclusiveTcpClient is null)); return false; }
  293. if (this.exclusiveTcpClient == null || !arg.Continue) { logger.Error("Write failed, this.tcpClient is null: " + (this.exclusiveTcpClient is null)); return false; }
  294. }
  295. catch (Exception exx)
  296. {
  297. var msgLogStr = "";
  298. try
  299. {
  300. msgLogStr = message.ToLogString();
  301. }
  302. catch
  303. {
  304. msgLogStr = "exceptioned for get ToLogString()";
  305. }
  306. //this.logger.LogError("Tcp Write failed in serialize or event raise for msg: " + message.GetType() + " -> " + msgLogStr + "\r\n detail: " + exx);
  307. logger.Error("Tcp Write failed in serialize or event raise for msg: " + message.GetType() + " -> " + msgLogStr + "\r\n detail: " + exx);
  308. return false;
  309. }
  310. lock (this.syncObject)
  311. {
  312. try
  313. {
  314. //if (logger.IsEnabled(LogLevel.Debug))
  315. // this.logger.LogDebug("TCP to " + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Outgoing--->: " + message.ToLogString() + "\r\n 0x" + rawData.ToHexLogString());
  316. if(rawData.Length>8 && rawData[6] != 0x55 && rawData[7] != 0x10)
  317. logger.Info("TCP to " + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Outgoing--->: " + message.ToLogString() + "\r\n 0x" + rawData.ToHexLogString());
  318. var sendCount = this.exclusiveTcpClient.Client.Send(rawData);
  319. if (sendCount == 0)
  320. throw new InvalidOperationException("the send count in this.exclusiveTcpClient.Client.Send is 0");
  321. }
  322. catch (Exception exx)
  323. {
  324. //this.logger.LogError("Send tcp msg to "
  325. // + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Write(...) exceptioned, treat as a broken tcp connection, will cancel the data read as well, detail: " + exx);
  326. logger.Error("Send tcp msg to "
  327. + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Write(...) exceptioned, treat as a broken tcp connection, will cancel the data read as well, detail: " + exx);
  328. try
  329. {
  330. this.readAsyncCancellationTokenSource.Cancel();
  331. }
  332. catch { }
  333. this.OnDisconnected?.Invoke(this, null);
  334. }
  335. }
  336. return true;
  337. }
  338. public bool Write(T message, object extraControlParameter)
  339. {
  340. throw new NotImplementedException();
  341. }
  342. //public TcpClient? GetTcpClient()
  343. //{
  344. // return this.exclusiveTcpClient;
  345. //}
  346. //public int GetServerPort()
  347. //{
  348. // return this.localTcpServerListeningPort;
  349. //}
  350. }
  351. }