using Edge.Core.Parser.BinaryParser.MessageEntity; using Edge.Core.Parser.BinaryParser.Util; using Edge.Core.Processor.Dispatcher.Attributes; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Edge.Core.Processor.Communicator { [MetaPartsDescriptor( "lang-zh-cn:Tcp(本机为客户端)通讯器lang-en-us:Tcp(as client) communicator", "lang-zh-cn:基于TCP/IP技术的通讯器, FC作为客户端主动连接服务器端lang-en-us:TCP/IP based communicator, FC as the client side and connecting to Server side")] public class TcpClientCommunicator : ICommunicator where T : MessageTemplateBase { //protected static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator"); protected ILogger logger = NullLogger.Instance; public static int tcpReceiveBufferSize = 1500; private Edge.Core.Parser.BinaryParser.ParserBase parser; private string remoteTcpServerListeningIpAddress; private int remoteTcpServerListeningPortNumber; protected TcpClient tcpClient; /// /// 0 is for false, 1 is for true. /// private int isInTcpConnecting = 0; private object syncObject = new object(); private object syncObject_Feed = new object(); private bool enableTcpKeepAlive = true; /// /// 0 for not started, 1 for started. /// private int isStarted = 0; protected bool disposed = false; private IMessageCutter messageCutter; //private List continueParseStack = new List(); public event EventHandler OnConnected; public event EventHandler OnDisconnected; public event EventHandler OnErrorMessageRead; public event EventHandler> OnRawDataWriting; public event EventHandler> OnDataReceived; /// /// fired once raw data read from underlying tcp connection, without message cutting yet. /// protected Action OnRawDataReceived; public string Identity { get; set; } [ParamsJsonSchemas("TcpClientCommunicatorCtorSchema")] public TcpClientCommunicator(IMessageCutter binaryMsgCutter, Edge.Core.Parser.BinaryParser.ParserBase parser, string remoteTcpServerListeningIpAddress, int remoteTcpServerTcpListeningPortNumber, string customLoggerFileName, IServiceProvider services) { this.Identity = remoteTcpServerListeningIpAddress + ":" + remoteTcpServerTcpListeningPortNumber; if (services != null) { var loggerFactory = services.GetRequiredService(); this.logger = loggerFactory.CreateLogger("Communicator"); if (!string.IsNullOrEmpty(customLoggerFileName)) if (customLoggerFileName == "*") this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + remoteTcpServerListeningIpAddress + "_" + remoteTcpServerTcpListeningPortNumber); else this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + customLoggerFileName); } this.messageCutter = binaryMsgCutter; this.parser = parser; this.remoteTcpServerListeningIpAddress = remoteTcpServerListeningIpAddress; this.remoteTcpServerListeningPortNumber = remoteTcpServerTcpListeningPortNumber; if (this.messageCutter != null) { this.messageCutter.OnInvalidMessageRead += (____, ______) => { var loggingStr = $"Message from {this.remoteTcpServerListeningIpAddress}:{this.remoteTcpServerListeningPortNumber} Read Invalid data, detail: {(______?.Message ?? "")}"; this.logger.LogInformation(loggingStr); this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(null, loggingStr)); }; this.messageCutter.OnMessageCut += (s, _) => { var eventArg = new CommunicatorEventArg(); try { eventArg.Data = this.messageCutter.Message; eventArg.Message = this.parser.Deserialize(this.messageCutter.Message.ToArray()) as T; if (logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug(" Parsed: " + eventArg.Message.ToLogString()); } catch (Exception ex) { var loggingStr = "Message from " + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber + " exceptioned in deserilaizing bytes:\r\n 0x" + this.messageCutter.Message.ToHexLogString() + "\r\n exception detail:\r\n" + ex; this.logger.LogError(loggingStr); this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(this.messageCutter.Message, loggingStr)); return; } try { this.OnDataReceived?.Invoke(this, eventArg); } catch (Exception ex) { this.logger.LogError(this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber + " exceptioned in handle message:\r\n" + eventArg.Message.ToLogString() + "\r\n exceptioned detail: \r\n" + ex); } }; } } public void Dispose() { this.tcpClient?.Close(); this.disposed = true; } public virtual Task Start() { if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0)) { try { this.SetupTcpClientConnectionInBackground(); return Task.FromResult(true); } catch (Exception exxx) { return Task.FromResult(false); } } return Task.FromResult(false); } public bool Write(T message) { if (message == null) return false; byte[] rawData; try { //rawData = this.parser.Serialize(message); rawData = message.ToCommonByteArray(); var safe = this.OnRawDataWriting; var arg = new CommunicatorEventArg() { Data = rawData, Message = message, Continue = true }; safe?.Invoke(this, arg); if (this.tcpClient == null || !arg.Continue) { this.logger.LogError("Write failed, this.tcpClient is null? " + (this.tcpClient is null)); return false; } } catch (Exception exx) { var msgLogStr = ""; try { msgLogStr = message.ToLogString(); } catch { msgLogStr = "exceptioned for get ToLogString()"; } this.logger.LogError("Tcp Write failed in serialize or event raise for msg: " + message.GetType() + " -> " + msgLogStr + "\r\n detail: " + exx); return false; } lock (this.syncObject) { try { if (this.isInTcpConnecting == 1) return false; if (logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("TCP to " + this.tcpClient.Client.RemoteEndPoint + " Outgoing--->: " + message.ToLogString() + "\r\n 0x" + rawData.ToHexLogString()); this.tcpClient.Client.Send(rawData); } catch (Exception exx) { this.logger.LogError("Send tcp msg to " + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber + " Write(...) exceptioned, will re-build connection, detail: " + exx); var _ = this.OnDisconnected; _?.Invoke(this, null); this.SetupTcpClientConnectionInBackground(); } } return true; } public bool Write(T message, object extraControlParameter) { throw new NotImplementedException(); } /// /// connect to remote pump, FC as the client side. /// /// private void SetupTcpClientConnectionInBackground() { if (0 == Interlocked.CompareExchange(ref this.isInTcpConnecting, 1, 0)) ThreadPool.QueueUserWorkItem(o => { while (!this.disposed) { try { this.logger.LogInformation("FCC as the tcp client side is establishing to remote tcp server " + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber); if (this.tcpClient != null) this.tcpClient.Dispose(); this.tcpClient = new TcpClient(this.remoteTcpServerListeningIpAddress, this.remoteTcpServerListeningPortNumber); if (this.enableTcpKeepAlive) { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { } else { //overall switch to enable the feature. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); //when entering tcp idle, how much time wait before send a KeepAlive package. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 20); //when in tcp idle, and failed to received previous KeepAlive response, sequence of KeepAlive packages will send by this interval, until response received, or TcpKeepAliveRetryCount reached. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 8); //when in tcp idle, how many KeepAlive package response have not received will treat as tcp broken, and trigger tcp disconnected exception. this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3); } } logger.LogInformation("FCC has established to remote tcp server(" + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber + "), local port: " + this.tcpClient.Client.LocalEndPoint.ToString()); byte[] buffer = new byte[tcpReceiveBufferSize]; this.tcpClient.GetStream().BeginRead(buffer, 0, tcpReceiveBufferSize, this.Tcp_DataReceived, new System.Tuple(this.tcpClient, buffer)); this.isInTcpConnecting = 0; var _ = this.OnConnected; _?.Invoke(this, null); return; } catch (Exception exxx) { this.logger.LogError("Setup tcp client which connect to remote tcp server(" + this.remoteTcpServerListeningIpAddress + ":" + this.remoteTcpServerListeningPortNumber + ") exceptioned: " + exxx + "\r\n Will keep retrying..."); Thread.Sleep(3000); } } }); } private void Tcp_DataReceived(IAsyncResult ar) { System.Tuple parameter = null; byte[] dataRead = null; int bytesReadCount = 0; try { parameter = (System.Tuple)(ar.AsyncState); dataRead = parameter.Item2; bytesReadCount = parameter.Item1.Client.EndReceive(ar); if (bytesReadCount == 0) throw new Exception("tcp client received 0 count data which indicates the connection is broken, trigger disconnection"); } catch (Exception tcpExp) { this.logger.LogInformation("Tcp client EndReceive from " + this.remoteTcpServerListeningIpAddress + " exceptioned: " + tcpExp + "\r\n will re-build connection..."); var _ = this.OnDisconnected; _?.Invoke(this, null); this.SetupTcpClientConnectionInBackground(); return; } try { if (logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("TCP from " + parameter.Item1.Client.RemoteEndPoint.ToString() + " <---Incoming: 0x" + dataRead.Take(bytesReadCount).ToHexLogString()); try { var data = dataRead.Take(bytesReadCount).ToArray(); this.OnRawDataReceived?.Invoke(data); lock (this.syncObject_Feed) { this.messageCutter.Feed(data); } } catch (Exception ex) { this.logger.LogError(" Exception in TCP Parsing bytes: 0x" + dataRead.Take(bytesReadCount).ToHexLogString() + ", detail: \r\n" + ex.ToString()); } byte[] buffer = new byte[tcpReceiveBufferSize]; parameter.Item1.GetStream().BeginRead(buffer, 0, tcpReceiveBufferSize, this.Tcp_DataReceived, new System.Tuple(parameter.Item1, buffer)); } catch (Exception ex) { this.logger.LogError(" Exception in Tcp_DataReceived(...):\r\n" + ex.ToString() + "\r\n will rebuild the connection until succeed."); var _ = this.OnDisconnected; _?.Invoke(this, null); this.SetupTcpClientConnectionInBackground(); return; } } } }