using Edge.Core.Parser;
using Edge.Core.Parser.BinaryParser.MessageEntity;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Edge.Core.Processor
{
    /// <summary>
    /// More like a ping pong game, only an incoming message, then trigger an outgoing write message.
    /// otherwise, the outgoing write message will be queued and wait.
    /// </summary>
    /// <typeparam name="TRaw"></typeparam>
    /// <typeparam name="TMessage"></typeparam>
    public class TimeWindowWithNegativePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage> where TMessage : MessageBase
    {
        private const int maxQueueLength = 50;
        private ConcurrentStack<object> outgoingMessageQueue = new ConcurrentStack<object>();

        /// <summary>
        /// when a message incame, how much time would delay before a responding(to device) message will send out.
        /// default set to 100ms.
        /// </summary>
        public int SinkTime { get; set; } = 100;
        private Thread guranteeOnTimeRespondingThread;

        public enum DeliveryMode
        {
            /// <summary>
            /// only last msg send by Write(...) (which queued here) will guranteed to be send in next cycle
            /// </summary>
            OnlyLastWrite,
            /// <summary>
            /// all msg send by Write(...) (which queued here) will guranteed to be send in next cycles
            /// </summary>
            AllWrite
        }

        public DeliveryMode Mode { get; set; } = DeliveryMode.OnlyLastWrite;

        private ManualResetEvent blocker = new ManualResetEvent(false);
        /// <summary>
        /// Gets or sets the predict for default responding(outgoing to device) message.
        /// The message will be send if the time window elapsed.
        /// </summary>
        public Func<TMessage, TMessage, TMessage> DefaultOutgoingMsgProducer { get; set; }

        private TMessage lastWriteMsg;
        /// <summary>
        /// 0 for not allowed, 1 for allowed.
        /// </summary>
        private int allowWriteThrough = 0;
        public TimeWindowWithNegativePollingOutgoing(IIncoming<TMessage> incoming, IServiceProvider services) : base(incoming, services)
        {
            this.guranteeOnTimeRespondingThread = new Thread(() =>
            {
                while (true)
                {
                    try
                    {
                        if (this.blocker.WaitOne())
                        {
                            Thread.Sleep(this.SinkTime);
                            if (this.outgoingMessageQueue.TryPop(out object pendingForWrite))
                            {
                                if (pendingForWrite is TMessage msg)
                                {
                                    base.Write(msg);
                                    this.lastWriteMsg = msg;
                                }
                                else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___)
                                {
                                    base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
                                    this.lastWriteMsg = ___.Item1;
                                }
                                else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter)
                                {
                                    base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
                                    this.lastWriteMsg = writeWith2Parameter.Item1;
                                }

                                if (this.Mode == DeliveryMode.OnlyLastWrite)
                                    this.outgoingMessageQueue.Clear();
                            }
                            else
                            {
                                var defaultMsg = this.DefaultOutgoingMsgProducer(incoming.Message, this.lastWriteMsg);
                                base.Write(defaultMsg);
                            }
                        }
                    }
                    finally
                    {
                        this.blocker.Reset();
                    }
                };
            });
            this.guranteeOnTimeRespondingThread.Start();
            incoming.OnMessageIncoming += (_, __) =>
            {
                this.blocker.Set();
            };
        }

        #region Write functions

        /// <summary>
        /// actual write action will be performed on another thread.
        /// </summary>
        /// <param name="message"></param>
        public override void Write(TMessage message)
        {
            this.outgoingMessageQueue.Push(message);
            if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
            //InternalTryWrite();
        }

        /// <summary>
        /// actual write action will be performed on another thread.
        /// </summary>
        /// <param name="request"></param>
        /// <param name="responseCapture"></param>
        /// <param name="callback"></param>
        /// <param name="timeout"></param>
        public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
        {
            this.outgoingMessageQueue.Push(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
            if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
            //InternalTryWrite();
        }

        /// <summary>
        /// actual write action will be performed on another thread.
        /// </summary>
        /// <param name="message"></param>
        /// <param name="extraControlParameter"></param>
        public override void Write(TMessage message, object extraControlParameter)
        {
            this.outgoingMessageQueue.Push(new Tuple<TMessage, object>(message, extraControlParameter));
            if (this.outgoingMessageQueue.Count > maxQueueLength) throw new ArgumentException("Max message queue length: " + maxQueueLength + " reached");
            //InternalTryWrite();
        }

        #endregion

        //private void InternalTryWrite()
        //{
        //    if (1 == Interlocked.CompareExchange(ref this.allowWriteThrough, 0, 1))
        //    {
        //        if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
        //        {
        //            if (pendingForWrite is TMessage msg)
        //            {
        //                base.Write(msg);
        //            }
        //            else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int> ___)
        //            {
        //                base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
        //            }
        //            else if (pendingForWrite is Tuple<TMessage, object> writeWith2Parameter)
        //            {
        //                base.Write(writeWith2Parameter.Item1, writeWith2Parameter.Item2);
        //            }
        //        }
        //    }
        //}
    }
}