using Edge.Core.Parser;
using Edge.Core.Parser.BinaryParser.MessageEntity;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Timer = System.Threading.Timer;
using System.Collections.Concurrent;
namespace Edge.Core.Processor
{
    /// <summary>
    /// With a build in timer to timely send msg from queue, 
    /// so every messages written will be firstly queued here and then delayed to send.
    /// NOTE, 
    /// </summary>
    /// <typeparam name="TRaw"></typeparam>
    /// <typeparam name="TMessage"></typeparam>
    public class TimeWindowWithActivePollingOutgoing<TRaw, TMessage> : Outgoing<TRaw, TMessage>, IDisposable where TMessage : MessageBase
    {
        static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Communicator");
        //use dedicated thread for make sure the polling interval accurate
        private Thread pollingThread;
        private bool disposed = false;
        /// <summary>
        /// I've see a case that underlying communicator suddenly muted for seconds(maybe remote device got stuck inside??), 
        /// and later, bunch of bytes flushed in and got deserialzed into Messages and all were piped into Handlers, 
        /// which surely will trigger the handler frequently call the Write(...) for a bunch of responses, thus full fill this queue very quickly.
        /// </summary>
        private const int maxQueueLength = 200;
        //private int activelyPollingInterval = 600;

        private ConcurrentQueue<object> outgoingMessageQueue = new ConcurrentQueue<object>();

        public Func<TMessage> PollingMsgProducer { get; set; }

        /// <summary>
        /// by ms, Gets the polling interval for internal polling timer.
        /// </summary>
        public int PollingInterval { get; }
        //public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval) : base(incoming)
        //{
        //    this.PollingInterval = activelyPollingInterval;
        //    this.polling = new Timer((__) =>
        //    {
        //        var result = this.polling.Change(Timeout.Infinite, Timeout.Infinite);
        //        if (!result)
        //            logger.Warn("Polling timer failed to Change to Infinite!!!");
        //        if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
        //        {
        //            if (pendingForWrite is TMessage)
        //            {
        //                base.Write(pendingForWrite as TMessage);
        //            }
        //            else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>)
        //            {
        //                var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>;
        //                base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
        //            }
        //            else if (pendingForWrite is Tuple<TMessage, object>)
        //            {
        //                var ___ = pendingForWrite as Tuple<TMessage, object>;
        //                base.Write(___.Item1, ___.Item2);
        //            }
        //        }
        //        else
        //            if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
        //        result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
        //        if (!result)
        //            logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
        //    }, null, 2000, activelyPollingInterval);
        //}

        public TimeWindowWithActivePollingOutgoing(IIncoming<TMessage> incoming, int activelyPollingInterval, IServiceProvider services) : base(incoming, services)
        {
            this.PollingInterval = activelyPollingInterval;
            this.pollingThread = new Thread(async () =>
            {
                while (!this.disposed)
                {
                    await Task.Delay(this.PollingInterval);
                    if (this.outgoingMessageQueue.TryDequeue(out object pendingForWrite))
                    {
                        if (pendingForWrite is TMessage)
                        {
                            base.Write(pendingForWrite as TMessage);
                        }
                        else if (pendingForWrite is Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>)
                        {
                            var ___ = pendingForWrite as Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>;
                            base.WriteAsync(___.Item1, ___.Item2, ___.Item3, ___.Item4);
                        }
                        else if (pendingForWrite is Tuple<TMessage, object>)
                        {
                            var ___ = pendingForWrite as Tuple<TMessage, object>;
                            base.Write(___.Item1, ___.Item2);
                        }
                    }
                    else
                        if (this.PollingMsgProducer != null) { var m = this.PollingMsgProducer(); if (m != null) base.Write(m); }
                }
            });
            this.pollingThread.Start();
            //result = this.polling.Change(activelyPollingInterval, activelyPollingInterval);
            //if (!result)
            //    logger.Warn("Polling timer failed to Change to activelyPollingInterval!!!");
            //}, null, 2000, activelyPollingInterval);
        }

        private void HandleQueueMaxLengthReachedError()
        {
            if (this.outgoingMessageQueue.Count > maxQueueLength / 3)
            {
                logger.Info("TimeWindowWithActivePollingOutgoing, queue reached its 1/3 capacity, just inform and will do nothing.");
            }
            else if (this.outgoingMessageQueue.Count > maxQueueLength)
            {
                /* let's dump inside elements to see what happened.*/
                string logStr = "";

                var msgLogStrings = this.outgoingMessageQueue.OfType<TMessage>().Select(s => s.ToLogString());
                if (msgLogStrings.Any()) logStr += msgLogStrings.Aggregate((acc, n) => acc + "\r\n====next=======\r\n" + n);

                var tuple4Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>>().Select(s => s.Item1.ToLogString());
                if (tuple4Strings.Any()) logStr += tuple4Strings.Aggregate((acc, n) => acc + "\r\n" + n);

                var tuple2Strings = this.outgoingMessageQueue.OfType<Tuple<TMessage, object>>().Select(s => s.Item1.ToLogString());
                if (tuple2Strings.Any()) logStr += tuple2Strings.Aggregate((acc, n) => acc + "\r\n" + n);

                /*I have no better way to handle this case but clear 1/3 of(oldest items) the queue*/
                for (int i = 0; i < maxQueueLength / 3; i++)
                {
                    object _;
                    this.outgoingMessageQueue.TryDequeue(out _);
                }
                //this.outgoingMessageQueue.Clear();
                logger.Error("TimeWindowWithActivePollingOutgoing, Will clear 1/3 of the queue since Max message queue length: " + maxQueueLength
                    + " reached, elements are: " + logStr);
            }
        }

        /// <summary>
        /// Write a msg to queue, and actual write action will be performed on another thread.
        /// </summary>
        /// <param name="message"></param>
        public override void Write(TMessage message)
        {
            this.outgoingMessageQueue.Enqueue(message);
            this.HandleQueueMaxLengthReachedError();
        }

        /// <summary>
        /// actual write action will be performed on another thread.
        /// </summary>
        /// <param name="request"></param>
        /// <param name="responseCapture"></param>
        /// <param name="callback"></param>
        /// <param name="timeout"></param>
        public override void WriteAsync(TMessage request, Func<TMessage, TMessage, bool> responseCapture, Action<TMessage, TMessage> callback, int timeout)
        {
            this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, Func<TMessage, TMessage, bool>, Action<TMessage, TMessage>, int>(request, responseCapture, callback, timeout));
            this.HandleQueueMaxLengthReachedError();
        }

        /// <summary>
        /// actual write action will be performed on another thread.
        /// </summary>
        /// <param name="message"></param>
        /// <param name="extraControlParameter"></param>
        public override void Write(TMessage message, object extraControlParameter)
        {
            this.outgoingMessageQueue.Enqueue(new Tuple<TMessage, object>(message, extraControlParameter));
            this.HandleQueueMaxLengthReachedError();
        }

        public void Dispose()
        {
            this.disposed = true;
        }
    }
}