using Edge.Core.Parser;
using Edge.Core.Parser.BinaryParser.MessageEntity;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Edge.Core.Processor
{
///
/// More like a ping pong game, only an incoming message, then trigger an outgoing write message.
/// otherwise, the outgoing write message will be queued and wait.
///
///
///
public class TimeWindowWithNegativePollingOutgoing : Outgoing where TMessage : MessageBase
{
private const int maxQueueLength = 50;
private ConcurrentStack outgoingMessageQueue = new ConcurrentStack();
///
/// when a message incame, how much time would delay before a responding(to device) message will send out.
/// default set to 100ms.
///
public int SinkTime { get; set; } = 100;
private Thread guranteeOnTimeRespondingThread;
public enum DeliveryMode
{
///
/// only last msg send by Write(...) (which queued here) will guranteed to be send in next cycle
///
OnlyLastWrite,
///
/// all msg send by Write(...) (which queued here) will guranteed to be send in next cycles
///
AllWrite
}
public DeliveryMode Mode { get; set; } = DeliveryMode.OnlyLastWrite;
private ManualResetEvent blocker = new ManualResetEvent(false);
///
/// Gets or sets the predict for default responding(outgoing to device) message.
/// The message will be send if the time window elapsed.
///
public Func DefaultOutgoingMsgProducer { get; set; }
private TMessage lastWriteMsg;
///
/// 0 for not allowed, 1 for allowed.
///
private int allowWriteThrough = 0;
public TimeWindowWithNegativePollingOutgoing(IIncoming incoming, IServiceProvider services) : base(incoming, services)
{
this.guranteeOnTimeRespondingThread = new Thread(() =>
{
while (true)
{
try
{
if (this.blocker.WaitOne())
{
Thread.Sleep(this.SinkTime);
if (this.outgoingMessageQueue.TryPop(out object pendingForWrite))
{
if (pendingForWrite is TMessage msg)
{
base.Write(msg);
this.lastWriteMsg = msg;
}
else if (pendingForWrite is Tuple, Action, int> ___)
{
base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
this.lastWriteMsg = ___.Item1;
}
else if (pendingForWrite is Tuple writeWith2Parameter)
{
base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
this.lastWriteMsg = writeWith2Parameter.Item1;
}
if (this.Mode == DeliveryMode.OnlyLastWrite)
this.outgoingMessageQueue.Clear();
}
else
{
var defaultMsg = this.DefaultOutgoingMsgProducer(incoming.Message, this.lastWriteMsg);
base.Write(defaultMsg);
}
}
}
finally
{
this.blocker.Reset();
}
};
});
this.guranteeOnTimeRespondingThread.Start();
incoming.OnMessageIncoming += (_, __) =>
{
this.blocker.Set();
};
}
#region Write functions
///
/// actual write action will be performed on another thread.
///
///
public override void Write(TMessage message)
{
this.outgoingMessageQueue.Push(message);
if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
//InternalTryWrite();
}
///
/// actual write action will be performed on another thread.
///
///
///
///
///
public override void WriteAsync(TMessage request, Func responseCapture, Action callback, int timeout)
{
this.outgoingMessageQueue.Push(new Tuple, Action, int>(request, responseCapture, callback, timeout));
if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
//InternalTryWrite();
}
///
/// actual write action will be performed on another thread.
///
///
///
public override void Write(TMessage message, object extraControlParameter)
{
this.outgoingMessageQueue.Push(new Tuple(message, extraControlParameter));
if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
//InternalTryWrite();
}
#endregion
//private void InternalTryWrite()
//{
// if (1 == Interlocked.CompareExchange(ref this.allowWriteThrough, 0, 1))
// {
// if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
// {
// if (pendingForWrite is TMessage msg)
// {
// base.Write(msg);
// }
// else if (pendingForWrite is Tuple, Action, int> ___)
// {
// base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
// }
// else if (pendingForWrite is Tuple writeWith2Parameter)
// {
// base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
// }
// }
// }
//}
}
}