using Edge.Core.Parser.BinaryParser.MessageEntity; using Edge.Core.Parser.BinaryParser.Util; using Edge.Core.Processor.Dispatcher.Attributes; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Edge.Core.Processor.Communicator { [MetaPartsDescriptor( "lang-zh-cn:Tcp(本程序为服务器端)通讯器lang-en-us:Tcp(as server) communicator", "lang-zh-cn:基于TCP/IP技术的通讯器, FC作为服务器端等待客户端连接lang-en-us:TCP/IP based communicator, FC as the server and wait for connections")] public class TcpServerCommunicator : ICommunicator where T : MessageTemplateBase { private CancellationTokenSource readAsyncCancellationTokenSource; private DateTime? lastReceiveMsgDataFromTcpClientDateTime; private System.Timers.Timer clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer; //protected static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator"); protected ILogger logger = NullLogger.Instance; public static int tcpReceiveBufferSize = 1500; private Edge.Core.Parser.BinaryParser.ParserBase parser; private int localTcpServerListeningPort; protected TcpListener tcpListener; private TcpClient exclusiveTcpClient; private string exclusiveTcpClient_ClientRemoteEndPoint_Str = "?:?"; //private bool isTcpConnBrokenDuringWrite = false; private object syncObject = new object(); private object syncObject_Feed = new object(); /// /// 0 for not started, 1 for started. /// private int isStarted = 0; protected bool disposed = false; private IMessageCutter messageCutter; //private List continueParseStack = new List(); public event EventHandler OnConnected; public event EventHandler OnDisconnected; public event EventHandler OnErrorMessageRead; public event EventHandler> OnRawDataWriting; public event EventHandler> OnDataReceived; /// /// fired once raw data read from underlying tcp connection, without message cutting yet. /// protected Action OnRawDataReceived; public string Identity { get; set; } [ParamsJsonSchemas("TcpServerCommunicatorCtorSchema")] public TcpServerCommunicator(IMessageCutter binaryMsgCutter, Edge.Core.Parser.BinaryParser.ParserBase parser, int localTcpServerTcpListeningPortNumber, int enableClientSideActiveDetection, string enableClientSideIdentityRegistration, string enableClientSideConnLevelHeartbeat, string customLoggerFileName, IServiceProvider services) { this.Identity = "*:" + localTcpServerTcpListeningPortNumber; if (services != null) { var loggerFactory = services.GetRequiredService(); this.logger = loggerFactory.CreateLogger("Communicator"); if (!string.IsNullOrEmpty(customLoggerFileName)) if (customLoggerFileName == "*") this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + localTcpServerTcpListeningPortNumber); else this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + customLoggerFileName); } this.messageCutter = binaryMsgCutter; this.parser = parser; //this.remoteTcpServerListeningIpAddress = localTcpServerListeningIpAddress; this.localTcpServerListeningPort = localTcpServerTcpListeningPortNumber; if (this.messageCutter != null) { this.messageCutter.OnInvalidMessageRead += (____, ______) => { var loggingStr = $"Bytes msg from tcp client: {exclusiveTcpClient_ClientRemoteEndPoint_Str} Read Invalid data, detail: {(______?.Message ?? "")}"; this.logger.LogInformation(loggingStr); this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(null, loggingStr)); }; this.messageCutter.OnMessageCut += (s, _) => { this.lastReceiveMsgDataFromTcpClientDateTime = DateTime.Now; var eventArg = new CommunicatorEventArg(); try { eventArg.Data = this.messageCutter.Message; eventArg.Message = this.parser.Deserialize(this.messageCutter.Message.ToArray()) as T; if (logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug(" Parsed: " + eventArg.Message.ToLogString()); } catch (Exception ex) { var loggingStr = "Message from " + exclusiveTcpClient_ClientRemoteEndPoint_Str + " exceptioned in deserilaizing bytes:\r\n 0x" + this.messageCutter.Message.ToHexLogString() + "\r\n exception detail:\r\n" + ex; this.logger.LogError(loggingStr); this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(this.messageCutter.Message, loggingStr)); return; } try { this.OnDataReceived?.Invoke(this, eventArg); } catch (Exception ex) { this.logger.LogError("Message from " + exclusiveTcpClient_ClientRemoteEndPoint_Str + " exceptioned in handle message:\r\n" + eventArg.Message.ToLogString() + "\r\n exceptioned detail: \r\n" + ex); } }; } if (enableClientSideActiveDetection > 0) { this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer = new System.Timers.Timer(); this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Interval = 2000; this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Elapsed += (s, a) => { //null indicates this communicator just started if (this.lastReceiveMsgDataFromTcpClientDateTime == null) return; if (DateTime.Now.Subtract(this.lastReceiveMsgDataFromTcpClientDateTime ?? DateTime.MinValue).TotalSeconds >= enableClientSideActiveDetection && this.exclusiveTcpClient != null) { this.logger.LogInformation($"Long time no see data from tcp client with: { this.exclusiveTcpClient_ClientRemoteEndPoint_Str }, will actively disconnect it..."); this.readAsyncCancellationTokenSource?.Cancel(); } }; this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Start(); } } public void Dispose() { try { this.readAsyncCancellationTokenSource?.Cancel(); } catch { } this.isStarted = 0; this.tcpListener?.Stop(); this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer?.Stop(); this.disposed = true; } public virtual async Task Stop() { this.Dispose(); return true; } public virtual Task Start() { if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0)) { try { this.tcpListener = new TcpListener(IPAddress.Any, this.localTcpServerListeningPort); this.tcpListener.Start(); this.logger.LogInformation($"TcpListener listened on localPort: {this.localTcpServerListeningPort}"); var _ = Task.Run(async () => { while (this.isStarted == 1) { logger.LogInformation($"Waitting for connection on localPort: {this.localTcpServerListeningPort}"); var newTcpClient = await this.tcpListener.AcceptTcpClientAsync(); this.logger.LogInformation($" A tcp client with remote ip/port: {newTcpClient.Client.RemoteEndPoint} has connected in"); if (this.exclusiveTcpClient != null) { logger.LogInformation($" There's already a previous TcpClient established as exclusive, so close this new one with remote ip/port: {newTcpClient.Client.RemoteEndPoint}"); try { newTcpClient.Close(); } catch { } continue; } this.exclusiveTcpClient = newTcpClient; this.exclusiveTcpClient_ClientRemoteEndPoint_Str = this.exclusiveTcpClient.Client.RemoteEndPoint.ToString(); this.logger.LogInformation($" Tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} has been chosen as exclusive"); var ___ = Task.Run(async () => { #region try set the tcp client with TCP keepalive feature, but seems does not work. if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { } else { //logger.LogInformation($"Enabled tcp keep alive for tcpClient"); //overall switch to enable the feature. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); //when entering tcp idle, how much time wait before send a KeepAlive package. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 20); //when in tcp idle, and failed to received previous KeepAlive response, sequence of KeepAlive packages will send by this interval, until response received, or TcpKeepAliveRetryCount reached. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 8); //when in tcp idle, how many KeepAlive package response have not received will treat as tcp broken, and trigger tcp disconnected exception. this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3); } #endregion this.lastReceiveMsgDataFromTcpClientDateTime = DateTime.Now; this.OnConnected?.Invoke(this, null); while (this.isStarted == 1) { this.readAsyncCancellationTokenSource = new CancellationTokenSource(); byte[] buffer = new byte[tcpReceiveBufferSize]; int bytesReadCount; try { bytesReadCount = await this.exclusiveTcpClient.GetStream().ReadAsync(buffer, this.readAsyncCancellationTokenSource.Token); if (bytesReadCount == 0) throw new Exception("tcp server received 0 count data indicates the connection is broken"); if (logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug($"TCP from { this.exclusiveTcpClient_ClientRemoteEndPoint_Str } <---Incoming: 0x" + buffer.Take(bytesReadCount).ToHexLogString()); } catch (Exception eeee) { logger.LogError($"tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} exceptioned in GetStream().ReadAsync(), treat as tcp disconnection, detail: {eeee}"); try { this.exclusiveTcpClient.Close(); } finally { this.exclusiveTcpClient = null; } try { this.OnDisconnected?.Invoke(this, null); } catch { } break; } var data = buffer.Take(bytesReadCount).ToArray(); try { this.OnRawDataReceived?.Invoke(data); } catch { } try { lock (this.syncObject_Feed) { this.messageCutter.Feed(data, this.localTcpServerListeningPort); } } catch (Exception ex) { this.logger.LogError($"Exception in Parsing msg bytes: 0x{ buffer.Take(bytesReadCount).ToHexLogString()}, detail: {Environment.NewLine}{ ex.ToString()}"); } } }); } }); return Task.FromResult(true); } catch (Exception exxx) { logger.LogError($"Start tcp listener on port: {this.localTcpServerListeningPort} exceptioned: {exxx}"); return Task.FromResult(false); } } return Task.FromResult(false); } public bool Write(T message) { if (this.exclusiveTcpClient == null) { //if (this.logger.IsEnabled(LogLevel.Trace)) // this.logger.LogTrace($"Write failed as no tcp client connected in yet"); return false; } if (message == null) return false; byte[] rawData; try { rawData = this.parser.Serialize(message); var arg = new CommunicatorEventArg() { Data = rawData, Message = message, Continue = true }; this.OnRawDataWriting?.Invoke(this, arg); if (this.exclusiveTcpClient == null || !arg.Continue) { this.logger.LogError("Write failed, this.tcpClient is null: " + (this.exclusiveTcpClient is null)); return false; } } catch (Exception exx) { var msgLogStr = ""; try { msgLogStr = message.ToLogString(); } catch { msgLogStr = "exceptioned for get ToLogString()"; } this.logger.LogError("Tcp Write failed in serialize or event raise for msg: " + message.GetType() + " -> " + msgLogStr + "\r\n detail: " + exx); return false; } lock (this.syncObject) { try { if (logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("TCP to " + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Outgoing--->: " + message.ToLogString() + "\r\n 0x" + rawData.ToHexLogString()); var sendCount = this.exclusiveTcpClient.Client.Send(rawData); if (sendCount == 0) throw new InvalidOperationException("the send count in this.exclusiveTcpClient.Client.Send is 0"); } catch (Exception exx) { this.logger.LogError("Send tcp msg to " + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Write(...) exceptioned, treat as a broken tcp connection, will cancel the data read as well, detail: " + exx); try { this.readAsyncCancellationTokenSource.Cancel(); } catch { } this.OnDisconnected?.Invoke(this, null); } } return true; } public bool Write(T message, object extraControlParameter) { throw new NotImplementedException(); } } }