123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- using Edge.Core.Parser.BinaryParser.MessageEntity;
- using Edge.Core.Parser.BinaryParser.Util;
- using Edge.Core.Processor.Dispatcher.Attributes;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net;
- using System.Net.Sockets;
- using System.Runtime.InteropServices;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace Edge.Core.Processor.Communicator
- {
- [MetaPartsDescriptor(
- "lang-zh-cn:Tcp(本程序为服务器端)通讯器lang-en-us:Tcp(as server) communicator",
- "lang-zh-cn:基于TCP/IP技术的通讯器, FC作为服务器端等待客户端连接lang-en-us:TCP/IP based communicator, FC as the server and wait for connections")]
- public class TcpServerCommunicator<T> : ICommunicator<byte[], T> where T : MessageTemplateBase
- {
- private CancellationTokenSource readAsyncCancellationTokenSource;
- private DateTime? lastReceiveMsgDataFromTcpClientDateTime;
- private System.Timers.Timer clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer;
- //protected static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
- protected ILogger logger = NullLogger.Instance;
- public static int tcpReceiveBufferSize = 1500;
- private Edge.Core.Parser.BinaryParser.ParserBase parser;
- private int localTcpServerListeningPort;
- protected TcpListener tcpListener;
- private TcpClient exclusiveTcpClient;
- private string exclusiveTcpClient_ClientRemoteEndPoint_Str = "?:?";
- //private bool isTcpConnBrokenDuringWrite = false;
- private object syncObject = new object();
- private object syncObject_Feed = new object();
- /// <summary>
- /// 0 for not started, 1 for started.
- /// </summary>
- private int isStarted = 0;
- protected bool disposed = false;
- private IMessageCutter<byte[]> messageCutter;
- //private List<MessageTemplateBase> continueParseStack = new List<MessageTemplateBase>();
- public event EventHandler OnConnected;
- public event EventHandler OnDisconnected;
- public event EventHandler<CommunicatorErrorMessageReadEventArg> OnErrorMessageRead;
- public event EventHandler<CommunicatorEventArg<byte[], T>> OnRawDataWriting;
- public event EventHandler<CommunicatorEventArg<byte[], T>> OnDataReceived;
- /// <summary>
- /// fired once raw data read from underlying tcp connection, without message cutting yet.
- /// </summary>
- protected Action<byte[]> OnRawDataReceived;
- public string Identity { get; set; }
- [ParamsJsonSchemas("TcpServerCommunicatorCtorSchema")]
- public TcpServerCommunicator(IMessageCutter<byte[]> binaryMsgCutter,
- Edge.Core.Parser.BinaryParser.ParserBase parser,
- int localTcpServerTcpListeningPortNumber,
- int enableClientSideActiveDetection,
- string enableClientSideIdentityRegistration,
- string enableClientSideConnLevelHeartbeat,
- string customLoggerFileName,
- IServiceProvider services)
- {
- this.Identity = "*:" + localTcpServerTcpListeningPortNumber;
- if (services != null)
- {
- var loggerFactory = services.GetRequiredService<ILoggerFactory>();
- this.logger = loggerFactory.CreateLogger("Communicator");
- if (!string.IsNullOrEmpty(customLoggerFileName))
- if (customLoggerFileName == "*")
- this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + localTcpServerTcpListeningPortNumber);
- else
- this.logger = loggerFactory.CreateLogger("DynamicPrivate_Comm_" + customLoggerFileName);
- }
- this.messageCutter = binaryMsgCutter;
- this.parser = parser;
- //this.remoteTcpServerListeningIpAddress = localTcpServerListeningIpAddress;
- this.localTcpServerListeningPort = localTcpServerTcpListeningPortNumber;
- if (this.messageCutter != null)
- {
- this.messageCutter.OnInvalidMessageRead += (____, ______) =>
- {
- var loggingStr = $"Bytes msg from tcp client: {exclusiveTcpClient_ClientRemoteEndPoint_Str} Read Invalid data, detail: {(______?.Message ?? "")}";
- this.logger.LogInformation(loggingStr);
- this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(null, loggingStr));
- };
- this.messageCutter.OnMessageCut += (s, _) =>
- {
- this.lastReceiveMsgDataFromTcpClientDateTime = DateTime.Now;
- var eventArg = new CommunicatorEventArg<byte[], T>();
- try
- {
- eventArg.Data = this.messageCutter.Message;
- eventArg.Message = this.parser.Deserialize(this.messageCutter.Message.ToArray()) as T;
- if (logger.IsEnabled(LogLevel.Debug))
- this.logger.LogDebug(" Parsed: " + eventArg.Message.ToLogString());
- }
- catch (Exception ex)
- {
- var loggingStr = "Message from " + exclusiveTcpClient_ClientRemoteEndPoint_Str
- + " exceptioned in deserilaizing bytes:\r\n 0x" + this.messageCutter.Message.ToHexLogString() + "\r\n exception detail:\r\n" + ex;
- this.logger.LogError(loggingStr);
- this.OnErrorMessageRead?.Invoke(this, new CommunicatorErrorMessageReadEventArg(this.messageCutter.Message, loggingStr));
- return;
- }
- try
- {
- this.OnDataReceived?.Invoke(this, eventArg);
- }
- catch (Exception ex)
- {
- this.logger.LogError("Message from " + exclusiveTcpClient_ClientRemoteEndPoint_Str
- + " exceptioned in handle message:\r\n" + eventArg.Message.ToLogString() + "\r\n exceptioned detail: \r\n" + ex);
- }
- };
- }
- if (enableClientSideActiveDetection > 0)
- {
- this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer = new System.Timers.Timer();
- this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Interval = 2000;
- this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Elapsed += (s, a) =>
- {
- //null indicates this communicator just started
- if (this.lastReceiveMsgDataFromTcpClientDateTime == null) return;
- if (DateTime.Now.Subtract(this.lastReceiveMsgDataFromTcpClientDateTime ?? DateTime.MinValue).TotalSeconds >= enableClientSideActiveDetection
- && this.exclusiveTcpClient != null)
- {
- this.logger.LogInformation($"Long time no see data from tcp client with: { this.exclusiveTcpClient_ClientRemoteEndPoint_Str }, will actively disconnect it...");
- this.readAsyncCancellationTokenSource?.Cancel();
- }
- };
- this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer.Start();
- }
- }
- public void Dispose()
- {
- try
- {
- this.readAsyncCancellationTokenSource?.Cancel();
- }
- catch { }
- this.isStarted = 0;
- this.tcpListener?.Stop();
- this.clientSideActiveDetectionByCheckingLongTimeNoSeeDataIncomingWatchTimer?.Stop();
- this.disposed = true;
- }
- public virtual async Task<bool> Stop()
- {
- this.Dispose();
- return true;
- }
- public virtual Task<bool> Start()
- {
- if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0))
- {
- try
- {
- this.tcpListener = new TcpListener(IPAddress.Any, this.localTcpServerListeningPort);
- this.tcpListener.Start();
- this.logger.LogInformation($"TcpListener listened on localPort: {this.localTcpServerListeningPort}");
- var _ = Task.Run(async () =>
- {
- while (this.isStarted == 1)
- {
- logger.LogInformation($"Waitting for connection on localPort: {this.localTcpServerListeningPort}");
- var newTcpClient = await this.tcpListener.AcceptTcpClientAsync();
- this.logger.LogInformation($" A tcp client with remote ip/port: {newTcpClient.Client.RemoteEndPoint} has connected in");
- if (this.exclusiveTcpClient != null)
- {
- logger.LogInformation($" There's already a previous TcpClient established as exclusive, so close this new one with remote ip/port: {newTcpClient.Client.RemoteEndPoint}");
- try
- {
- newTcpClient.Close();
- }
- catch { }
- continue;
- }
- this.exclusiveTcpClient = newTcpClient;
- this.exclusiveTcpClient_ClientRemoteEndPoint_Str = this.exclusiveTcpClient.Client.RemoteEndPoint.ToString();
- this.logger.LogInformation($" Tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} has been chosen as exclusive");
- var ___ = Task.Run(async () =>
- {
- #region try set the tcp client with TCP keepalive feature, but seems does not work.
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- {
- }
- else
- {
- //logger.LogInformation($"Enabled tcp keep alive for tcpClient");
- //overall switch to enable the feature.
- this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
- //when entering tcp idle, how much time wait before send a KeepAlive package.
- this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, 20);
- //when in tcp idle, and failed to received previous KeepAlive response, sequence of KeepAlive packages will send by this interval, until response received, or TcpKeepAliveRetryCount reached.
- this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, 8);
- //when in tcp idle, how many KeepAlive package response have not received will treat as tcp broken, and trigger tcp disconnected exception.
- this.exclusiveTcpClient.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, 3);
- }
- #endregion
- this.lastReceiveMsgDataFromTcpClientDateTime = DateTime.Now;
- this.OnConnected?.Invoke(this, null);
- while (this.isStarted == 1)
- {
- this.readAsyncCancellationTokenSource = new CancellationTokenSource();
- byte[] buffer = new byte[tcpReceiveBufferSize];
- int bytesReadCount;
- try
- {
- bytesReadCount = await this.exclusiveTcpClient.GetStream().ReadAsync(buffer, this.readAsyncCancellationTokenSource.Token);
- if (bytesReadCount == 0)
- throw new Exception("tcp server received 0 count data indicates the connection is broken");
- if (logger.IsEnabled(LogLevel.Debug))
- this.logger.LogDebug($"TCP from { this.exclusiveTcpClient_ClientRemoteEndPoint_Str } <---Incoming: 0x" + buffer.Take(bytesReadCount).ToHexLogString());
- }
- catch (Exception eeee)
- {
- logger.LogError($"tcp client with remote ip/port: {this.exclusiveTcpClient_ClientRemoteEndPoint_Str} exceptioned in GetStream().ReadAsync(), treat as tcp disconnection, detail: {eeee}");
- try
- {
- this.exclusiveTcpClient.Close();
- }
- finally { this.exclusiveTcpClient = null; }
- try
- {
- this.OnDisconnected?.Invoke(this, null);
- }
- catch { }
- break;
- }
- var data = buffer.Take(bytesReadCount).ToArray();
- try
- {
- this.OnRawDataReceived?.Invoke(data);
- }
- catch { }
- try
- {
- lock (this.syncObject_Feed)
- {
- this.messageCutter.Feed(data, this.localTcpServerListeningPort);
- }
- }
- catch (Exception ex)
- {
- this.logger.LogError($"Exception in Parsing msg bytes: 0x{ buffer.Take(bytesReadCount).ToHexLogString()}, detail: {Environment.NewLine}{ ex.ToString()}");
- }
- }
- });
- }
- });
- return Task.FromResult(true);
- }
- catch (Exception exxx)
- {
- logger.LogError($"Start tcp listener on port: {this.localTcpServerListeningPort} exceptioned: {exxx}");
- return Task.FromResult(false);
- }
- }
- return Task.FromResult(false);
- }
- public bool Write(T message)
- {
- if (this.exclusiveTcpClient == null)
- {
- //if (this.logger.IsEnabled(LogLevel.Trace))
- // this.logger.LogTrace($"Write failed as no tcp client connected in yet");
- return false;
- }
- if (message == null) return false;
- byte[] rawData;
- try
- {
- rawData = this.parser.Serialize(message);
- var arg = new CommunicatorEventArg<byte[], T>() { Data = rawData, Message = message, Continue = true };
- this.OnRawDataWriting?.Invoke(this, arg);
- if (this.exclusiveTcpClient == null || !arg.Continue) { this.logger.LogError("Write failed, this.tcpClient is null: " + (this.exclusiveTcpClient is null)); return false; }
- }
- catch (Exception exx)
- {
- var msgLogStr = "";
- try
- {
- msgLogStr = message.ToLogString();
- }
- catch
- {
- msgLogStr = "exceptioned for get ToLogString()";
- }
- this.logger.LogError("Tcp Write failed in serialize or event raise for msg: " + message.GetType() + " -> " + msgLogStr + "\r\n detail: " + exx);
- return false;
- }
- lock (this.syncObject)
- {
- try
- {
- if (logger.IsEnabled(LogLevel.Debug))
- this.logger.LogDebug("TCP to " + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Outgoing--->: " + message.ToLogString() + "\r\n 0x" + rawData.ToHexLogString());
- var sendCount = this.exclusiveTcpClient.Client.Send(rawData);
- if (sendCount == 0)
- throw new InvalidOperationException("the send count in this.exclusiveTcpClient.Client.Send is 0");
- }
- catch (Exception exx)
- {
- this.logger.LogError("Send tcp msg to "
- + (this.exclusiveTcpClient_ClientRemoteEndPoint_Str) + " Write(...) exceptioned, treat as a broken tcp connection, will cancel the data read as well, detail: " + exx);
- try
- {
- this.readAsyncCancellationTokenSource.Cancel();
- }
- catch { }
- this.OnDisconnected?.Invoke(this, null);
- }
- }
- return true;
- }
- public bool Write(T message, object extraControlParameter)
- {
- throw new NotImplementedException();
- }
- }
- }
|