using Edge.Core.Parser; using Edge.Core.Parser.BinaryParser.MessageEntity; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Edge.Core.Processor { public class Outgoing : IOutgoing where TMessage : MessageBase { //static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator"); static ILogger logger = NullLogger.Instance; protected IIncoming incoming; public event EventHandler> OnWriting; public Outgoing(IIncoming incoming) { this.incoming = incoming; } public Outgoing(IIncoming incoming, IServiceProvider services) { this.incoming = incoming; if (services != null) { var loggerFactory = services.GetRequiredService(); logger = loggerFactory.CreateLogger("Communicator"); } } /// /// /// /// request to remote peer, which should trigger a response from remote peer. /// predict of response correlated to request, first is the request, 2nd is the pending for capture response /// when response catpured, or timed, this callback will be called /// time out, by ms public virtual void WriteAsync(TMessage request, Func responseCapture, Action callback, int timeout) { int isCallbackCalled = 0; EventHandler eventHandler = null; eventHandler = (s, a) => { var incoming = s as Incoming; if (responseCapture(request, incoming.Message)) { this.incoming.OnMessageIncoming -= eventHandler; if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0) { this.incoming.DisablePropagate = true; try { callback(request, incoming.Message); } catch (Exception exx) { logger.LogError("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); } } } }; this.incoming.OnMessageIncoming += eventHandler; var _ = new System.Timers.Timer(timeout); _.Elapsed += (__, ___) => { _.Stop(); this.incoming.OnMessageIncoming -= eventHandler; if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0) { callback(request, null); } }; _.Start(); var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg() { Message = request }); } public virtual Task WriteAsync(TMessage request, Func responseCapture, int timeout) { var tcs = new TaskCompletionSource(); this.WriteAsync(request, responseCapture, (oriRequest, response) => { if (response != null) tcs.SetResult(response); else tcs.SetResult(null); }, timeout); return tcs.Task; } public virtual void Write(TMessage message) { var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg() { Message = message }); } /// /// /// /// /// public virtual void Write(TMessage message, object extraControlParameter) { var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg() { Message = message, ExtraControlParameter = extraControlParameter }); } } }