using Edge.Core.Parser;
using Edge.Core.Parser.BinaryParser.MessageEntity;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Timer = System.Threading.Timer;
using System.Collections.Concurrent;
namespace Edge.Core.Processor
{
///
/// With a build in timer to timely send msg from queue,
/// so every messages written will be firstly queued here and then delayed to send.
/// NOTE,
///
///
///
public class TimeWindowWithActivePollingOutgoing : Outgoing, IDisposable where TMessage : MessageBase
{
static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
//use dedicated thread for make sure the polling interval accurate
private Thread pollingThread;
private bool disposed = false;
///
/// I've see a case that underlying communicator suddenly muted for seconds(maybe remote device got stuck inside??),
/// and later, bunch of bytes flushed in and got deserialzed into Messages and all were piped into Handlers,
/// which surely will trigger the handler frequently call the Write(...) for a bunch of responses, thus full fill this queue very quickly.
///
private const int maxQueueLength = 200;
//private int activelyPollingInterval = 600;
private ConcurrentQueue outgoingMessageQueue = new ConcurrentQueue();
public Func PollingMsgProducer { get; set; }
///
/// by ms, Gets the polling interval for internal polling timer.
///
public int PollingInterval { get; }
//public TimeWindowWithActivePollingOutgoing(IIncoming incoming, int activelyPollingInterval) : base(incoming)
//{
// this.PollingInterval = activelyPollingInterval;
// this.polling = new Timer((__) =>
// {
// var result = this.polling.Change(Timeout.Infinite, Timeout.Infinite);
// if (!result)
// logger.Warn("Polling timer failed to Change to Infinite!!!");
// if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
// {
// if (pendingForWrite is TMessage)
// {
// base.Write(pendingForWrite as TMessage);
// }
// else if (pendingForWrite is Tuple, Action, int>)
// {
// var ___ = pendingForWrite as Tuple, Action, int>;
// base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
// }
// else if (pendingForWrite is Tuple)
// {
// var ___ = pendingForWrite as Tuple;
// base.Write(___.Item1, ___.Item2);
// }
// }
// else
// if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
// result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
// if (!result)
// logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
// }, null, 2000, activelyPollingInterval);
//}
public TimeWindowWithActivePollingOutgoing(IIncoming incoming, int activelyPollingInterval, IServiceProvider services) : base(incoming, services)
{
this.PollingInterval = activelyPollingInterval;
this.pollingThread = new Thread(async () =>
{
while (!this.disposed)
{
await Task.Delay(this.PollingInterval);
if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
{
if (pendingForWrite is TMessage)
{
base.Write(pendingForWrite as TMessage);
}
else if (pendingForWrite is Tuple, Action, int>)
{
var ___ = pendingForWrite as Tuple, Action, int>;
base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
}
else if (pendingForWrite is Tuple)
{
var ___ = pendingForWrite as Tuple;
base.Write(___.Item1, ___.Item2);
}
}
else
if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
}
});
this.pollingThread.Start();
//result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
//if (!result)
// logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
//}, null, 2000, activelyPollingInterval);
}
private void HandleQueueMaxLengthReachedError()
{
if (this.outgoingMessageQueue.Count > maxQueueLength / 3)
{
logger.Info("TimeWindowWithActivePollingOutgoing, queue reached its 1/3 capacity, just inform and will do nothing.");
}
else if (this.outgoingMessageQueue.Count > maxQueueLength)
{
/* let's dump inside elements to see what happened.*/
string logStr = "";
var msgLogStrings = this.outgoingMessageQueue.OfType().Select(s => s.ToLogString());
if (msgLogStrings.Any()) logStr += msgLogStrings.Aggregate((acc, n) => acc + "\r\n====next=======\r\n" + n);
var tuple4Strings = this.outgoingMessageQueue.OfType, Action, int>>().Select(s => s.Item1.ToLogString());
if (tuple4Strings.Any()) logStr += tuple4Strings.Aggregate((acc, n) => acc + "\r\n" + n);
var tuple2Strings = this.outgoingMessageQueue.OfType>().Select(s => s.Item1.ToLogString());
if (tuple2Strings.Any()) logStr += tuple2Strings.Aggregate((acc, n) => acc + "\r\n" + n);
/*I have no better way to handle this case but clear 1/3 of(oldest items) the queue*/
for (int i = 0; i < maxQueueLength / 3; i++)
{
object _;
this.outgoingMessageQueue.TryDequeue(out _);
}
//this.outgoingMessageQueue.Clear();
logger.Error("TimeWindowWithActivePollingOutgoing, Will clear 1/3 of the queue since Max message queue length: " + maxQueueLength
+ " reached, elements are: " + logStr);
}
}
///
/// Write a msg to queue, and actual write action will be performed on another thread.
///
///
public override void Write(TMessage message)
{
this.outgoingMessageQueue.Enqueue(message);
this.HandleQueueMaxLengthReachedError();
}
///
/// actual write action will be performed on another thread.
///
///
///
///
///
public override void WriteAsync(TMessage request, Func responseCapture, Action callback, int timeout)
{
this.outgoingMessageQueue.Enqueue(new Tuple, Action, int>(request, responseCapture, callback, timeout));
this.HandleQueueMaxLengthReachedError();
}
///
/// actual write action will be performed on another thread.
///
///
///
public override void Write(TMessage message, object extraControlParameter)
{
this.outgoingMessageQueue.Enqueue(new Tuple(message, extraControlParameter));
this.HandleQueueMaxLengthReachedError();
}
public void Dispose()
{
this.disposed = true;
}
}
}