using Edge.Core.Parser; using Edge.Core.Parser.BinaryParser.MessageEntity; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Edge.Core.Processor { public class Outgoing : IOutgoing where TMessage : MessageBase { static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Outgoing"); //static ILogger logger = NullLogger.Instance; protected IIncoming incoming; public event EventHandler> OnWriting; //接收当前连接是否正常 private TaskCompletionSource isConnectTask; public Outgoing(IIncoming incoming) { this.incoming = incoming; } public Outgoing(IIncoming incoming, IServiceProvider services) { this.incoming = incoming; if (services != null) { var loggerFactory = services.GetRequiredService(); //logger = loggerFactory.CreateLogger("Communicator"); } } /// /// 通知当前链接已连接 /// public void OnConnect() { logger.Info("当前已连接"); isConnectTask = new TaskCompletionSource(); } /// /// 通知当前链接已断开 /// public void OnDisconnect() { logger.Info("当前已断开连接"); //isConnectTask.SetResult(false); } /// /// /// /// request to remote peer, which should trigger a response from remote peer. /// predict of response correlated to request, first is the request, 2nd is the pending for capture response /// when response catpured, or timed, this callback will be called /// time out, by ms public virtual void WriteAsync(TMessage request, Func responseCapture, Action callback, int timeout) { int isCallbackCalled = 0; EventHandler eventHandler = null; eventHandler = (s, a) => { var incoming = s as Incoming; if (responseCapture(request, incoming.Message)) { this.incoming.OnMessageIncoming -= eventHandler; if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0) { this.incoming.DisablePropagate = true; try { callback(request, incoming.Message); } //catch (Exception exx) { logger.LogError("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); } catch (Exception exx) { logger.Error("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); } } } }; this.incoming.OnMessageIncoming += eventHandler; var _ = new System.Timers.Timer(timeout); _.Elapsed += (__, ___) => { _.Stop(); this.incoming.OnMessageIncoming -= eventHandler; if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0) { callback(request, null); } }; _.Start(); var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg() { Message = request }); } public virtual Task WriteAsync(TMessage request, Func responseCapture, int timeout) { var tcs = new TaskCompletionSource(); this.WriteAsync(request, responseCapture, (oriRequest, response) => { if (response != null) tcs.SetResult(response); else tcs.SetResult(null); }, timeout); return tcs.Task; } public virtual async Task WriteAsyncAndCheckIsConnect(TMessage request, Func responseCapture, int timeout) { if (isConnectTask == null) { return new WriteRepsonse() { ResponseType = WriteResponseType.DISCONNECT, Data = false, }; } Task sendTask = WriteAsync(request, responseCapture, timeout); Task responseTask = await Task.WhenAny(sendTask, isConnectTask.Task); var type = WriteResponseType.TIME_OUT; object? result = null; if (responseTask == sendTask) { result = await sendTask; if (result != null) type = WriteResponseType.OK; } if(responseTask == isConnectTask.Task) { result = await isConnectTask.Task; type = WriteResponseType.DISCONNECT; } return new WriteRepsonse() { ResponseType = type, Data = result, }; } public virtual void Write(TMessage message) { var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg() { Message = message }); } /// /// /// /// /// public virtual void Write(TMessage message, object extraControlParameter) { var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg() { Message = message, ExtraControlParameter = extraControlParameter }); } } public class WriteRepsonse { public WriteResponseType ResponseType { get; set; } public object? Data { get; set; } } public enum WriteResponseType { TIME_OUT, DISCONNECT, OK } }