using Edge.Core.Processor;
using Edge.Core.IndustryStandardInterface.Pump;
using Edge.Core.UniversalApi;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.CSharp.Syntax;
using Edge.Core.Configuration;
using Edge.Core.Processor.Dispatcher.Attributes;
using Edge.Core.UniversalApi.CommunicationProvider;
using System.Diagnostics.CodeAnalysis;
using System.Collections.Concurrent;
using MQTTnet.Client;
using MQTTnet.Packets;

namespace Edge.Core.UniversalApi
{
    public class LocalMqttCommunicatorProvider : ICommunicationProvider, IEventThrottler
    {
        protected ILogger logger = NullLogger.Instance;
        //private UniversalApiInvoker universalApiInvoker;
        private IServiceProvider services;
        //private IEnumerable<IProcessor> processors;
        protected IManagedMqttClient mqttClient;

        private class ThrottledEvent
        {
            public string EventName;
            public string EventBatchId;
            public DateTime throttleStartTime;
            public TimeSpan throttlePeriod;
        }

        private ConcurrentDictionary<string, ThrottledEvent> throttledEvents = new ConcurrentDictionary<string, ThrottledEvent>();

        private JsonSerializerOptions jsonSerializerOptions;
        /// <summary>
        /// Tuple is: topic string, UniversalApiInfo, ApiName, topic usage doc string.
        /// </summary>
        private Dictionary<string, Tuple<UniversalApiInfo, string, string>> mqttApiInfos
            = new Dictionary<string, Tuple<UniversalApiInfo, string, string>>();
        internal LocalMqttCommunicatorProvider(IServiceProvider services)
        {
            this.services = services;
            this.jsonSerializerOptions = new JsonSerializerOptions()
            {
                WriteIndented = true,
                PropertyNameCaseInsensitive = true,
            };
            this.jsonSerializerOptions.Converters.Add(new JsonStringEnumConverter());
        }

        /// <summary>
        /// 设备属性调用, 下行, 外部调用 App or Handler 中的Universal Api Property.
        /// </summary>
        private string Topic_PropertyFormatStr
            = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}/+";

        /// <summary>
        /// 设备属性调用, 上行, 回复给外部调用端
        /// </summary>
        private string Topic_PropertyReplyFormatStr
            = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}_reply/+";
        /// <summary>
        /// 设备服务调用, 下行, 外部调用 App or Handler 中的Universal Api method.
        /// Local device service invoked at remote should sub the msg on this topic.
        /// must fill the concrete: deviceName
        /// </summary>
        private string Topic_ServiceFormatStr
            = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}/+";

        /// <summary>
        /// 设备服务调用, 上行, 回复给外部调用端
        /// </summary>
        private string Topic_ServiceReplyFormatStr
            = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}_reply/+";

        /// <summary>
        /// 设备事件上报 上行(Alink JSON)
        /// Local device event fire should publish the msg on this topic.
        /// must fill the concrete: deviceName
        /// </summary>
        private string Topic_EventFormatStr
            = "/{sys/}{endpointFullTypeName}/{processorName}/thing/event/{universalApi.MethodName}/post";

        protected virtual string TopicMaker(string topicFormatStr, IProcessor processor, string universalApiServiceOrPropertyOrEventName)
        {
            var endpointTypeMetaPartsDescriptor = processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes<MetaPartsDescriptor>().FirstOrDefault();
            if (endpointTypeMetaPartsDescriptor?.IsSystemInternalComponent ?? false)
                topicFormatStr = topicFormatStr.Replace("{sys/}", "sys/");
            else
                topicFormatStr = topicFormatStr.Replace("{sys/}", "");
            return topicFormatStr.Replace("{endpointFullTypeName}", processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName)
                .Replace("{processorName}", processor.MetaConfigName)
                .Replace("{universalApi.MethodName}", universalApiServiceOrPropertyOrEventName);
        }

        protected virtual ManagedMqttClientOptions GetManagedMqttClientOptions()
        {
            var localMqttServerTcpListeningPort =
                  this.services.GetRequiredService<Configurator>().MetaConfiguration
                      .Parameter?.FirstOrDefault(p => p.Name == "MqttServerTcpListeningPort")?.Value ?? "8388";
            return new ManagedMqttClientOptionsBuilder()
                .WithAutoReconnectDelay(TimeSpan.FromSeconds(30))
                .WithClientOptions(new MqttClientOptionsBuilder()
                .WithClientId("universalApiFccClient_local")
                .WithTcpServer("localhost", int.Parse(localMqttServerTcpListeningPort))
                // last will message is almost meaningless for local mqtt since the client and server are in same process, and in most time they start and stop at the same time.
                //.WithWillMessage(new MQTTnet.MqttApplicationMessage()
                //{
                //    Topic = "mylastwill",
                //    Payload = Encoding.UTF8.GetBytes("I was disconnected from local mqtt server")
                //})
                //.WithTls()
                .Build()).Build();
        }

        public virtual async Task SetupAsync(IEnumerable<IProcessor> processors)
        {
            var loggerFactory = services.GetRequiredService<ILoggerFactory>();
            this.logger = loggerFactory.CreateLogger("Main");

            this.mqttClient?.Dispose();
            this.mqttClient = this.services.GetRequiredService<IManagedMqttClient>();
            var mqttClientOptions = this.GetManagedMqttClientOptions();
            var universalApiInvoker = services.GetRequiredService<UniversalApiInvoker>();
            this.mqttClient.DisconnectedAsync += (arg) =>
            {
                this.logger.LogInformation($"{this.GetType().Name}, Disconnected from mqtt server, ReasonString: {arg.ReasonString ?? ""}, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}");
                return Task.CompletedTask;
            };
            this.mqttClient.ConnectedAsync += (arg) =>
            {
                this.logger.LogInformation($"{this.GetType().Name}, Connected to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}");
                return Task.CompletedTask;
            };
            this.mqttClient.ConnectingFailedAsync += (arg) =>
            {
                this.logger.LogInformation($"{this.GetType().Name}, Connecting Failed to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}");
                return Task.CompletedTask;
            };
            this.mqttApiInfos.Clear();
            var processorDescriptors = processors.Select(p => p.ProcessorDescriptor())
                .Where(pd => pd.UniversalApiInfos != null && pd.UniversalApiInfos.Any());
            List<string> serviceTopics = new List<string>();
            foreach (var processorDesc in processorDescriptors)
            {
                foreach (var serviceOrPropertyApiInfo in
                    // setup Service Api here
                    processorDesc.UniversalApiInfos.Where(i => i.ServiceApiInfo != null || i.PropertyApiInfo != null))
                {
                    string subscribeTopic;
                    if (serviceOrPropertyApiInfo.ServiceApiInfo != null)
                    {
                        subscribeTopic = this.TopicMaker(this.Topic_ServiceFormatStr,
                              processorDesc.Processor,
                              serviceOrPropertyApiInfo.ServiceApiInfo.Name);
                        this.logger.LogDebug(this.GetType().Name + ", will sub Service stub: " + subscribeTopic);
                        serviceTopics.Add(subscribeTopic);
                        this.mqttApiInfos.Add(subscribeTopic,
                            new Tuple<UniversalApiInfo, string, string>(
                                serviceOrPropertyApiInfo,
                                serviceOrPropertyApiInfo.ServiceApiInfo.Name,
                                //"Publish to this topic for a Service call (replace the '+' with unique id for create calling session)</br>" + 
                                (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));

                        var publishTopic_for_serivceReply = this.TopicMaker(this.Topic_ServiceReplyFormatStr,
                            processorDesc.Processor,
                            serviceOrPropertyApiInfo.ServiceApiInfo.Name);
                        this.mqttApiInfos.Add(publishTopic_for_serivceReply,
                            new Tuple<UniversalApiInfo, string, string>(
                                serviceOrPropertyApiInfo,
                                serviceOrPropertyApiInfo.ServiceApiInfo.Name + "_Reply",
                                //"Subscribe to this topic for get Service call result (replace '+' with the id in service call)")+
                                (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
                    }
                    else
                    {
                        subscribeTopic = this.TopicMaker(this.Topic_PropertyFormatStr,
                              processorDesc.Processor,
                              serviceOrPropertyApiInfo.PropertyApiInfo.Name);
                        this.logger.LogDebug(this.GetType().Name + ", will sub Property stub: " + subscribeTopic);
                        serviceTopics.Add(subscribeTopic);
                        this.mqttApiInfos.Add(subscribeTopic,
                            new Tuple<UniversalApiInfo, string, string>(
                                serviceOrPropertyApiInfo,
                                serviceOrPropertyApiInfo.PropertyApiInfo.Name,
                                //"Publish to this topic for a Property call (replace the '+' with unique id for create calling session)</br>" + 
                                (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));

                        var publishTopic_for_propertyReply = this.TopicMaker(this.Topic_PropertyReplyFormatStr,
                            processorDesc.Processor,
                            serviceOrPropertyApiInfo.PropertyApiInfo.Name);
                        this.mqttApiInfos.Add(publishTopic_for_propertyReply,
                            new Tuple<UniversalApiInfo, string, string>(
                                serviceOrPropertyApiInfo,
                                serviceOrPropertyApiInfo.PropertyApiInfo.Name + "_Reply",
                                //"Subscribe to this topic for get Property call result (replace '+' with the id in Property call)"+
                                (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
                    }
                }

                foreach (var eventUniversalApiInfo in
                   // setup Event Api here
                   processorDesc.UniversalApiInfos.Where(i => i.EventApiInfo != null))
                {
                    var eventTopic = this.TopicMaker(this.Topic_EventFormatStr,
                        eventUniversalApiInfo.Processor,
                        eventUniversalApiInfo.EventApiInfo?.EventName ?? "");
                    this.mqttApiInfos.Add(eventTopic,
                        new Tuple<UniversalApiInfo, string, string>(
                            eventUniversalApiInfo,
                            eventUniversalApiInfo.EventApiInfo?.EventName ?? "",
                            //"Subscribe to this topic to receive an Event.")
                            eventUniversalApiInfo.ApiAttribute.Description ?? ""
                        ));
                }
            }

            this.mqttClient.ApplicationMessageReceivedAsync += (async e =>
              {
                  var requestContext = new RequestContext(e);
                  //if (this.logger.IsEnabled(LogLevel.Debug))
                  //{
                  //    this.logger.LogDebug($"+ received msg on Topic = {e.ApplicationMessage.Topic}");
                  //    this.logger.LogDebug($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
                  //    this.logger.LogDebug($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
                  //    this.logger.LogDebug($"+ Retain = {e.ApplicationMessage.Retain}");
                  //}

                  //Tuple<UniversalApiInfo, string, string> serviceCallInfo;
                  foreach (var openTopic in this.mqttApiInfos.Keys)
                  {
                      // incoming topic is like: /sys/FdcServerApp/Applications.FDC.FdcServerHostApp/thing/service/GetPumpsLayout/112233
                      string requestingTopicStaticPart = e.ApplicationMessage.Topic.Substring(0, e.ApplicationMessage.Topic.LastIndexOf("/"));
                      string requestingTopicSessionIdPart = e.ApplicationMessage.Topic.Substring(e.ApplicationMessage.Topic.LastIndexOf("/") + 1);
                      if (openTopic.IndexOf(requestingTopicStaticPart) >= 0)
                      {
                          var serviceOrPropertyCallInfo = this.mqttApiInfos[openTopic];
                          string serviceOrPropertyReplyTopic = null;
                          object serviceOrPropertyCallResult = null;
                          try
                          {
                              JsonElement[] inputParameters = null;
                              try
                              {
                                  if (e.ApplicationMessage.Payload == null || !e.ApplicationMessage.Payload.Any())
                                  {
                                  }
                                  else
                                  {
                                      var inputParameter = JsonSerializer.Deserialize<JsonElement>(e.ApplicationMessage.Payload);
                                      if (inputParameter.ValueKind == JsonValueKind.Array)
                                          inputParameters = inputParameter.EnumerateArray().ToArray();
                                      else if (inputParameter.ValueKind == JsonValueKind.Object)
                                          inputParameters = new JsonElement[] { inputParameter };
                                      else if (inputParameter.ValueKind == JsonValueKind.Undefined)
                                      { }
                                      else
                                          inputParameters = new JsonElement[] { inputParameter };
                                  }
                              }
                              catch (Exception eeee)
                              {
                                  //serviceCallResult = new
                                  //{
                                  //    ErrorCode = 400,
                                  //    Message = "Could not parse incoming Mqtt msg Payload to JsonElement[]"
                                  //};
                                  throw new InvalidDataException("Could not parse incoming Mqtt msg Payload as a Json Object or Json Array");
                              }

                              if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo != null)
                              {
                                  serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_ServiceReplyFormatStr,
                                      serviceOrPropertyCallInfo.Item1.Processor,
                                      serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name).Replace("+", requestingTopicSessionIdPart);
                                  if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters().Length != (inputParameters?.Length ?? 0))
                                      throw new InvalidDataException("Could not find Universal Api Service in the processor has Method Parameters count == "
                                          + (inputParameters?.Length ?? 0));

                                  var apiParameters = serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters();
                                  serviceOrPropertyCallResult =
                                          await universalApiInvoker.InvokeUniversalApiServiceAsync(
                                              serviceOrPropertyCallInfo.Item1.Processor,
                                              serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name,
                                              inputParameters,
                                              requestContext);
                              }
                              else
                              {
                                  serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_PropertyReplyFormatStr,
                                      serviceOrPropertyCallInfo.Item1.Processor,
                                      serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name).Replace("+", requestingTopicSessionIdPart);
                                  if (inputParameters != null && inputParameters.Length >= 2)
                                      throw new InvalidDataException("Unexpected too many parameters passed in. when calling a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject");

                                  if (inputParameters == null)
                                  {
                                      /*call 'get'*/
                                      serviceOrPropertyCallResult =
                                          universalApiInvoker.InvokeUniversalApiPropertyGet(
                                              serviceOrPropertyCallInfo.Item1.Processor,
                                              serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name,
                                              requestContext);
                                  }
                                  else
                                  {
                                      /*call 'set'*/
                                      JsonElement? setValue = null;
                                      if (inputParameters.Length == 0)
                                      {
                                      }
                                      else if (inputParameters.Length >= 2)
                                      {
                                          throw new InvalidDataException("Unexpected too many parameters passed in. when calling for set value for a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject");
                                      }
                                      else
                                          setValue = inputParameters[0];
                                      serviceOrPropertyCallResult =
                                          universalApiInvoker.InvokeUniversalApiPropertySet(
                                              serviceOrPropertyCallInfo.Item1.Processor,
                                              serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name, setValue, requestContext);
                                  }
                              }
                          }
                          catch (Exception eeee)
                          {
                              serviceOrPropertyCallResult = new
                              {
                                  ErrorCode = 400,
                                  Message = eeee.ToString()
                              };
                          }

                          var serviceCallResultJsonBytes = JsonSerializer.SerializeToUtf8Bytes(serviceOrPropertyCallResult, this.jsonSerializerOptions);
                          await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
                          {
                              ApplicationMessage = new MQTTnet.MqttApplicationMessage()
                              {
                                  Topic = serviceOrPropertyReplyTopic,
                                  Payload = serviceCallResultJsonBytes
                              }
                          });
                          break;
                      }
                      else
                      {

                      }
                  }
              });
            await this.mqttClient.SubscribeAsync(serviceTopics.Select(t =>
                new MQTTnet.Packets.MqttTopicFilter()
                {
                    // last part is the id for seperate request/response sessions
                    // missing id part will also trigger the service call, but request/response are not pair guranteed.
                    Topic = t// + "/+"
                }).ToList());
            await this.mqttClient.StartAsync(mqttClientOptions);
        }

        public virtual async Task<bool> RouteEventAsync(IProcessor eventSource, EventDescriptor eventDescriptor)
        {
            if (this.mqttClient == null || !this.mqttClient.IsStarted || !this.mqttClient.IsConnected)
                return false;
            try
            {
                if (this.throttledEvents.Any()
                    && eventDescriptor.Data?.GetType().GetProperty("BatchId") is PropertyInfo batchIdProperty)
                {
                    var batchId = batchIdProperty.GetValue(eventDescriptor.Data)?.ToString();
                    if (!string.IsNullOrEmpty(batchId))
                    {
                        var key = eventSource.MetaConfigName + eventDescriptor.Name + batchId;
                        ThrottledEvent tEvt = null;
                        if (this.throttledEvents.TryGetValue(key, out tEvt))
                        {
                            if (DateTime.Now.Subtract(tEvt.throttleStartTime) > tEvt.throttlePeriod)
                            {
                                this.throttledEvents.TryRemove(key, out _);
                            }
                            else
                            {
                                //throttled
                                return true;
                            }
                        }
                    }
                }

                var processorDescription = eventSource.ProcessorDescriptor();
                var eventApiInfo = processorDescription?.UniversalApiInfos.FirstOrDefault(i => i.EventApiInfo != null && i.EventApiInfo.EventName == eventDescriptor.Name);
                //.DeviceProcessorHandlerOrAppliation.GetType().GetCustomAttributes(typeof(UniversalApiAttribute), true)?.FirstOrDefault();
                if (eventApiInfo == null) return false;
                var eventTopic = this.TopicMaker(this.Topic_EventFormatStr,
                    eventSource,
                    eventDescriptor.Name);
                var eventDataJsonBytes = JsonSerializer.SerializeToUtf8Bytes(eventDescriptor.Data, this.jsonSerializerOptions);
                await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
                {
                    ApplicationMessage = new MQTTnet.MqttApplicationMessage()
                    {
                        Topic = eventTopic,
                        Payload = eventDataJsonBytes
                    }
                });
                return true;
            }
            catch
            {
                return false;
            }
        }

        public virtual IEnumerable<UniversalApiInfoDoc> GetApiDocuments()
        {
            //var pretty = "<ul>" + this.mqttApiDescriptions.GroupBy(api =>
            //    api.Value.Item1.DeviceProcessorHandlerOrAppliation.GetType().FullName)
            //    .Select(g => "<li>" + g.Key + "(api count: " + g.Count() + ") <ul>" +
            //        g.Select(i => "<li>" + i.Value.Item2.Name + "</li><p>topic: " + i.Key + "</p><p>" + i.Value.Item3 + "</p>")
            //            .Aggregate((acc, n) => acc + n)
            //    + "</ul><li>")
            //    .Aggregate((acc, n) => acc + Environment.NewLine + n) + "</ul>";
            //return pretty;

            return this.mqttApiInfos.Keys.Select(mqttApiTopic =>
            {
                var apiInfo = this.mqttApiInfos[mqttApiTopic].Item1;
                var doc = new UniversalApiInfoDoc()
                {
                    ProviderType = apiInfo.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName
                        //+ ", " + apiInfo.Processor.MetaConfigName
                        ,
                    BaseCategory = apiInfo.ServiceApiInfo == null ?
                        (apiInfo.PropertyApiInfo == null ? "Event" : "Property") : "Service",
                    ProviderConfigName = apiInfo.Processor.MetaConfigName,
                    ApiName = this.mqttApiInfos[mqttApiTopic].Item2,
                    Path = mqttApiTopic,
                    Description = this.mqttApiInfos[mqttApiTopic].Item3,
                    InputParametersExampleJson = apiInfo.ApiAttribute.InputParametersExampleJson //adding dynamic generate one by schema like webapi,
                    //InputParameters = apiInfo.ServiceApiInfo == null ?
                    //   // for event, providing the EventDataType
                    //   (apiInfo.PropertyApiInfo == null ?
                    //        (apiInfo.EventApiInfo?.EventDataType?.ToString() ?? "")
                    //            : apiInfo.PropertyApiInfo.PropertyType.ToString()) :
                    //               // for service, providing the parameter list
                    //               apiInfo.ServiceApiInfo.GetParameters()?.Select(p => p.ParameterType.Name + " " + (p.Name ?? ""))?.Aggregate("", (acc, n) => (string.IsNullOrEmpty(acc) ? "" : (acc + ", ")) + n) ?? "",
                };
                var rawTags = new Type[] { this.mqttApiInfos[mqttApiTopic].Item1.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType() }.ExtractProcessorMetaDescriptor(true).FirstOrDefault()?.Tags;
                if (rawTags != null)
                {
                    var localizedTags = new List<string>();
                    foreach (var t in rawTags)
                        localizedTags.Add(t.LocalizedContent("en"));
                    doc.ProviderTags = localizedTags.ToArray();
                }

                string[] inputParametersJsonSchemaStrings = null;
                string outputParameterJsonSchema = null;
                if (apiInfo.ServiceApiInfo != null)
                {
                    inputParametersJsonSchemaStrings = apiInfo.ServiceApiInfo.ResolveParamsJsonSchemas().ToArray();
                    outputParameterJsonSchema = apiInfo.ServiceApiInfo.ReturnType.GetGenericArguments().FirstOrDefault()?.ResolveParamsJsonSchema();
                }
                else if (apiInfo.PropertyApiInfo != null)
                {
                    if (!apiInfo.PropertyApiInfo.CanWrite
                               || apiInfo.PropertyApiInfo.SetMethod == null
                               || (apiInfo.PropertyApiInfo.SetMethod?.IsPrivate ?? false))
                    {
                        // indicates this Property is read-only
                        inputParametersJsonSchemaStrings = null;
                    }
                    else
                        inputParametersJsonSchemaStrings = new string[] { apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas() };
                    outputParameterJsonSchema = apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas();
                }
                else if (apiInfo.EventApiInfo != null)
                {
                    outputParameterJsonSchema = apiInfo.EventApiInfo.EventDataType?.ResolveParamsJsonSchema();
                }

                doc.InputParametersJsonSchemaStrings = inputParametersJsonSchemaStrings;
                doc.InputParametersExampleJson = doc.InputParametersExampleJson ?? inputParametersJsonSchemaStrings?.GenerateParametersJsonSample();

                doc.OutputParametersJsonSchema = outputParameterJsonSchema;
                doc.OutputParametersExampleJson = doc.OutputParametersExampleJson ?? outputParameterJsonSchema?.GenerateParameterJsonSample();

                return doc;
            });
        }

        public virtual void Dispose()
        {
            this.throttledEvents.Clear();
            this.mqttClient?.StopAsync();
        }

        public void Throttle(string processorMetaConfigName, string eventName, string eventBatchId, TimeSpan? period)
        {
            var key = processorMetaConfigName + eventName + eventBatchId;
            if (period == null)
            {
                this.throttledEvents.TryRemove(key, out _);
                return;
            }

            this.throttledEvents.AddOrUpdate(key, new ThrottledEvent()
            {
                EventName = eventName,
                EventBatchId = eventBatchId,
                throttlePeriod = period.Value,
                throttleStartTime = DateTime.Now
            }, (k, exist) => { exist.throttlePeriod = period.Value; exist.throttleStartTime = DateTime.Now; return exist; });
        }
    }
}