using Edge.Core.Parser; using Edge.Core.Parser.BinaryParser.MessageEntity; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Edge.Core.Processor { public class Outgoing<TRaw, TMessage> : IOutgoing<TRaw, TMessage> where TMessage : MessageBase { //static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator"); static ILogger logger = NullLogger.Instance; protected IIncoming<TMessage> incoming; public event EventHandler<OutgoingEventArg<TMessage>> OnWriting; public Outgoing(IIncoming<TMessage> incoming) { this.incoming = incoming; } public Outgoing(IIncoming<TMessage> incoming, IServiceProvider services) { this.incoming = incoming; if (services != null) { var loggerFactory = services.GetRequiredService<ILoggerFactory>(); logger = loggerFactory.CreateLogger("Communicator"); } } /// <summary> /// /// </summary> /// <param name="request">request to remote peer, which should trigger a response from remote peer.</param> /// <param name="responseCapture">predict of response correlated to request, first is the request, 2nd is the pending for capture response</param> /// <param name="callback">when response catpured, or timed, this callback will be called</param> /// <param name="timeout">time out, by ms</param> public virtual void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout) { int isCallbackCalled = 0; EventHandler eventHandler = null; eventHandler = (s, a) => { var incoming = s as Incoming<TMessage>; if (responseCapture(request, incoming.Message)) { this.incoming.OnMessageIncoming -= eventHandler; if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0) { this.incoming.DisablePropagate = true; try { callback(request, incoming.Message); } catch (Exception exx) { logger.LogError("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); } } } }; this.incoming.OnMessageIncoming += eventHandler; var _ = new System.Timers.Timer(timeout); _.Elapsed += (__, ___) => { _.Stop(); this.incoming.OnMessageIncoming -= eventHandler; if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0) { callback(request, null); } }; _.Start(); var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = request }); } public virtual Task<TMessage> WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, int timeout) { var tcs = new TaskCompletionSource<TMessage>(); this.WriteAsync(request, responseCapture, (oriRequest, response) => { if (response != null) tcs.SetResult(response); else tcs.SetResult(null); }, timeout); return tcs.Task; } public virtual void Write(TMessage message) { var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message }); } /// <summary> /// /// </summary> /// <param name="message"></param> /// <param name="redirectParameter"></param> public virtual void Write(TMessage message, object extraControlParameter) { var safe = this.OnWriting; safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message, ExtraControlParameter = extraControlParameter }); } } }