123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- using Edge.Core.Parser;
- using Edge.Core.Parser.BinaryParser.MessageEntity;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- namespace Edge.Core.Processor
- {
- public class Outgoing<TRaw, TMessage> : IOutgoing<TRaw, TMessage> where TMessage : MessageBase
- {
- //static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
- static ILogger logger = NullLogger.Instance;
- protected IIncoming<TMessage> incoming;
- public event EventHandler<OutgoingEventArg<TMessage>> OnWriting;
- public Outgoing(IIncoming<TMessage> incoming)
- {
- this.incoming = incoming;
- }
- public Outgoing(IIncoming<TMessage> incoming, IServiceProvider services)
- {
- this.incoming = incoming;
- if (services != null)
- {
- var loggerFactory = services.GetRequiredService<ILoggerFactory>();
- logger = loggerFactory.CreateLogger("Communicator");
- }
- }
- /// <summary>
- ///
- /// </summary>
- /// <param name="request">request to remote peer, which should trigger a response from remote peer.</param>
- /// <param name="responseCapture">predict of response correlated to request, first is the request, 2nd is the pending for capture response</param>
- /// <param name="callback">when response catpured, or timed, this callback will be called</param>
- /// <param name="timeout">time out, by ms</param>
- public virtual void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
- {
- int isCallbackCalled = 0;
- EventHandler eventHandler = null;
- eventHandler = (s, a) =>
- {
- var incoming = s as Incoming<TMessage>;
- if (responseCapture(request, incoming.Message))
- {
- this.incoming.OnMessageIncoming -= eventHandler;
- if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
- {
- this.incoming.DisablePropagate = true;
- try
- {
- callback(request, incoming.Message);
- }
- catch (Exception exx) { logger.LogError("Outgoing.WriteAsync(...,...,...,...) exceptioned in callback, detail: " + exx); }
- }
- }
- };
- this.incoming.OnMessageIncoming += eventHandler;
- var _ = new System.Timers.Timer(timeout);
- _.Elapsed += (__, ___) =>
- {
- _.Stop();
- this.incoming.OnMessageIncoming -= eventHandler;
- if (Interlocked.CompareExchange(ref isCallbackCalled, 1, 0) == 0)
- {
- callback(request, null);
- }
- };
- _.Start();
- var safe = this.OnWriting;
- safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = request });
- }
- public virtual Task<TMessage> WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, int timeout)
- {
- var tcs = new TaskCompletionSource<TMessage>();
- this.WriteAsync(request, responseCapture, (oriRequest, response) =>
- {
- if (response != null)
- tcs.SetResult(response);
- else
- tcs.SetResult(null);
- }, timeout);
- return tcs.Task;
- }
- public virtual void Write(TMessage message)
- {
- var safe = this.OnWriting;
- safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message });
- }
- /// <summary>
- ///
- /// </summary>
- /// <param name="message"></param>
- /// <param name="redirectParameter"></param>
- public virtual void Write(TMessage message, object extraControlParameter)
- {
- var safe = this.OnWriting;
- safe?.Invoke(this, new OutgoingEventArg<TMessage>() { Message = message, ExtraControlParameter = extraControlParameter });
- }
- }
- }
|