123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- using Edge.Core.Processor;
- using Edge.Core.IndustryStandardInterface.Pump;
- using Edge.Core.UniversalApi;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- using MQTTnet.Extensions.ManagedClient;
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Reflection;
- using System.Text;
- using System.Text.Json;
- using System.Text.Json.Serialization;
- using System.Threading.Tasks;
- using Microsoft.CodeAnalysis.CSharp.Syntax;
- using Edge.Core.Configuration;
- using Edge.Core.Processor.Dispatcher.Attributes;
- using Edge.Core.UniversalApi.CommunicationProvider;
- using System.Diagnostics.CodeAnalysis;
- using System.Collections.Concurrent;
- using MQTTnet.Client;
- using MQTTnet.Packets;
- namespace Edge.Core.UniversalApi
- {
- public class LocalMqttCommunicatorProvider : ICommunicationProvider, IEventThrottler
- {
- protected ILogger logger = NullLogger.Instance;
- //private UniversalApiInvoker universalApiInvoker;
- private IServiceProvider services;
- //private IEnumerable<IProcessor> processors;
- protected IManagedMqttClient mqttClient;
- private class ThrottledEvent
- {
- public string EventName;
- public string EventBatchId;
- public DateTime throttleStartTime;
- public TimeSpan throttlePeriod;
- }
- private ConcurrentDictionary<string, ThrottledEvent> throttledEvents = new ConcurrentDictionary<string, ThrottledEvent>();
- private JsonSerializerOptions jsonSerializerOptions;
- /// <summary>
- /// Tuple is: topic string, UniversalApiInfo, ApiName, topic usage doc string.
- /// </summary>
- private Dictionary<string, Tuple<UniversalApiInfo, string, string>> mqttApiInfos
- = new Dictionary<string, Tuple<UniversalApiInfo, string, string>>();
- internal LocalMqttCommunicatorProvider(IServiceProvider services)
- {
- this.services = services;
- this.jsonSerializerOptions = new JsonSerializerOptions()
- {
- WriteIndented = true,
- PropertyNameCaseInsensitive = true,
- };
- this.jsonSerializerOptions.Converters.Add(new JsonStringEnumConverter());
- }
- /// <summary>
- /// 设备属性调用, 下行, 外部调用 App or Handler 中的Universal Api Property.
- /// </summary>
- private string Topic_PropertyFormatStr
- = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}/+";
- /// <summary>
- /// 设备属性调用, 上行, 回复给外部调用端
- /// </summary>
- private string Topic_PropertyReplyFormatStr
- = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}_reply/+";
- /// <summary>
- /// 设备服务调用, 下行, 外部调用 App or Handler 中的Universal Api method.
- /// Local device service invoked at remote should sub the msg on this topic.
- /// must fill the concrete: deviceName
- /// </summary>
- private string Topic_ServiceFormatStr
- = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}/+";
- /// <summary>
- /// 设备服务调用, 上行, 回复给外部调用端
- /// </summary>
- private string Topic_ServiceReplyFormatStr
- = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}_reply/+";
- /// <summary>
- /// 设备事件上报 上行(Alink JSON)
- /// Local device event fire should publish the msg on this topic.
- /// must fill the concrete: deviceName
- /// </summary>
- private string Topic_EventFormatStr
- = "/{sys/}{endpointFullTypeName}/{processorName}/thing/event/{universalApi.MethodName}/post";
- protected virtual string TopicMaker(string topicFormatStr, IProcessor processor, string universalApiServiceOrPropertyOrEventName)
- {
- var endpointTypeMetaPartsDescriptor = processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes<MetaPartsDescriptor>().FirstOrDefault();
- if (endpointTypeMetaPartsDescriptor?.IsSystemInternalComponent ?? false)
- topicFormatStr = topicFormatStr.Replace("{sys/}", "sys/");
- else
- topicFormatStr = topicFormatStr.Replace("{sys/}", "");
- return topicFormatStr.Replace("{endpointFullTypeName}", processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName)
- .Replace("{processorName}", processor.MetaConfigName)
- .Replace("{universalApi.MethodName}", universalApiServiceOrPropertyOrEventName);
- }
- protected virtual ManagedMqttClientOptions GetManagedMqttClientOptions()
- {
- var localMqttServerTcpListeningPort =
- this.services.GetRequiredService<Configurator>().MetaConfiguration
- .Parameter?.FirstOrDefault(p => p.Name == "MqttServerTcpListeningPort")?.Value ?? "8388";
- return new ManagedMqttClientOptionsBuilder()
- .WithAutoReconnectDelay(TimeSpan.FromSeconds(30))
- .WithClientOptions(new MqttClientOptionsBuilder()
- .WithClientId("universalApiFccClient_local")
- .WithTcpServer("localhost", int.Parse(localMqttServerTcpListeningPort))
- // last will message is almost meaningless for local mqtt since the client and server are in same process, and in most time they start and stop at the same time.
- //.WithWillMessage(new MQTTnet.MqttApplicationMessage()
- //{
- // Topic = "mylastwill",
- // Payload = Encoding.UTF8.GetBytes("I was disconnected from local mqtt server")
- //})
- //.WithTls()
- .Build()).Build();
- }
- public virtual async Task SetupAsync(IEnumerable<IProcessor> processors)
- {
- var loggerFactory = services.GetRequiredService<ILoggerFactory>();
- this.logger = loggerFactory.CreateLogger("Main");
- this.mqttClient?.Dispose();
- this.mqttClient = this.services.GetRequiredService<IManagedMqttClient>();
- var mqttClientOptions = this.GetManagedMqttClientOptions();
- var universalApiInvoker = services.GetRequiredService<UniversalApiInvoker>();
- this.mqttClient.DisconnectedAsync += (arg) =>
- {
- this.logger.LogInformation($"{this.GetType().Name}, Disconnected from mqtt server, ReasonString: {arg.ReasonString ?? ""}, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}");
- return Task.CompletedTask;
- };
- this.mqttClient.ConnectedAsync += (arg) =>
- {
- this.logger.LogInformation($"{this.GetType().Name}, Connected to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}");
- return Task.CompletedTask;
- };
- this.mqttClient.ConnectingFailedAsync += (arg) =>
- {
- this.logger.LogInformation($"{this.GetType().Name}, Connecting Failed to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}");
- return Task.CompletedTask;
- };
- this.mqttApiInfos.Clear();
- var processorDescriptors = processors.Select(p => p.ProcessorDescriptor())
- .Where(pd => pd.UniversalApiInfos != null && pd.UniversalApiInfos.Any());
- List<string> serviceTopics = new List<string>();
- foreach (var processorDesc in processorDescriptors)
- {
- foreach (var serviceOrPropertyApiInfo in
- // setup Service Api here
- processorDesc.UniversalApiInfos.Where(i => i.ServiceApiInfo != null || i.PropertyApiInfo != null))
- {
- string subscribeTopic;
- if (serviceOrPropertyApiInfo.ServiceApiInfo != null)
- {
- subscribeTopic = this.TopicMaker(this.Topic_ServiceFormatStr,
- processorDesc.Processor,
- serviceOrPropertyApiInfo.ServiceApiInfo.Name);
- this.logger.LogDebug(this.GetType().Name + ", will sub Service stub: " + subscribeTopic);
- serviceTopics.Add(subscribeTopic);
- this.mqttApiInfos.Add(subscribeTopic,
- new Tuple<UniversalApiInfo, string, string>(
- serviceOrPropertyApiInfo,
- serviceOrPropertyApiInfo.ServiceApiInfo.Name,
- //"Publish to this topic for a Service call (replace the '+' with unique id for create calling session)</br>" +
- (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
- var publishTopic_for_serivceReply = this.TopicMaker(this.Topic_ServiceReplyFormatStr,
- processorDesc.Processor,
- serviceOrPropertyApiInfo.ServiceApiInfo.Name);
- this.mqttApiInfos.Add(publishTopic_for_serivceReply,
- new Tuple<UniversalApiInfo, string, string>(
- serviceOrPropertyApiInfo,
- serviceOrPropertyApiInfo.ServiceApiInfo.Name + "_Reply",
- //"Subscribe to this topic for get Service call result (replace '+' with the id in service call)")+
- (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
- }
- else
- {
- subscribeTopic = this.TopicMaker(this.Topic_PropertyFormatStr,
- processorDesc.Processor,
- serviceOrPropertyApiInfo.PropertyApiInfo.Name);
- this.logger.LogDebug(this.GetType().Name + ", will sub Property stub: " + subscribeTopic);
- serviceTopics.Add(subscribeTopic);
- this.mqttApiInfos.Add(subscribeTopic,
- new Tuple<UniversalApiInfo, string, string>(
- serviceOrPropertyApiInfo,
- serviceOrPropertyApiInfo.PropertyApiInfo.Name,
- //"Publish to this topic for a Property call (replace the '+' with unique id for create calling session)</br>" +
- (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
- var publishTopic_for_propertyReply = this.TopicMaker(this.Topic_PropertyReplyFormatStr,
- processorDesc.Processor,
- serviceOrPropertyApiInfo.PropertyApiInfo.Name);
- this.mqttApiInfos.Add(publishTopic_for_propertyReply,
- new Tuple<UniversalApiInfo, string, string>(
- serviceOrPropertyApiInfo,
- serviceOrPropertyApiInfo.PropertyApiInfo.Name + "_Reply",
- //"Subscribe to this topic for get Property call result (replace '+' with the id in Property call)"+
- (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
- }
- }
- foreach (var eventUniversalApiInfo in
- // setup Event Api here
- processorDesc.UniversalApiInfos.Where(i => i.EventApiInfo != null))
- {
- var eventTopic = this.TopicMaker(this.Topic_EventFormatStr,
- eventUniversalApiInfo.Processor,
- eventUniversalApiInfo.EventApiInfo?.EventName ?? "");
- this.mqttApiInfos.Add(eventTopic,
- new Tuple<UniversalApiInfo, string, string>(
- eventUniversalApiInfo,
- eventUniversalApiInfo.EventApiInfo?.EventName ?? "",
- //"Subscribe to this topic to receive an Event.")
- eventUniversalApiInfo.ApiAttribute.Description ?? ""
- ));
- }
- }
- this.mqttClient.ApplicationMessageReceivedAsync += (async e =>
- {
- var requestContext = new RequestContext(e);
- //if (this.logger.IsEnabled(LogLevel.Debug))
- //{
- // this.logger.LogDebug($"+ received msg on Topic = {e.ApplicationMessage.Topic}");
- // this.logger.LogDebug($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
- // this.logger.LogDebug($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
- // this.logger.LogDebug($"+ Retain = {e.ApplicationMessage.Retain}");
- //}
- //Tuple<UniversalApiInfo, string, string> serviceCallInfo;
- foreach (var openTopic in this.mqttApiInfos.Keys)
- {
- // incoming topic is like: /sys/FdcServerApp/Applications.FDC.FdcServerHostApp/thing/service/GetPumpsLayout/112233
- string requestingTopicStaticPart = e.ApplicationMessage.Topic.Substring(0, e.ApplicationMessage.Topic.LastIndexOf("/"));
- string requestingTopicSessionIdPart = e.ApplicationMessage.Topic.Substring(e.ApplicationMessage.Topic.LastIndexOf("/") + 1);
- if (openTopic.IndexOf(requestingTopicStaticPart) >= 0)
- {
- var serviceOrPropertyCallInfo = this.mqttApiInfos[openTopic];
- string serviceOrPropertyReplyTopic = null;
- object serviceOrPropertyCallResult = null;
- try
- {
- JsonElement[] inputParameters = null;
- try
- {
- if (e.ApplicationMessage.Payload == null || !e.ApplicationMessage.Payload.Any())
- {
- }
- else
- {
- var inputParameter = JsonSerializer.Deserialize<JsonElement>(e.ApplicationMessage.Payload);
- if (inputParameter.ValueKind == JsonValueKind.Array)
- inputParameters = inputParameter.EnumerateArray().ToArray();
- else if (inputParameter.ValueKind == JsonValueKind.Object)
- inputParameters = new JsonElement[] { inputParameter };
- else if (inputParameter.ValueKind == JsonValueKind.Undefined)
- { }
- else
- inputParameters = new JsonElement[] { inputParameter };
- }
- }
- catch (Exception eeee)
- {
- //serviceCallResult = new
- //{
- // ErrorCode = 400,
- // Message = "Could not parse incoming Mqtt msg Payload to JsonElement[]"
- //};
- throw new InvalidDataException("Could not parse incoming Mqtt msg Payload as a Json Object or Json Array");
- }
- if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo != null)
- {
- serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_ServiceReplyFormatStr,
- serviceOrPropertyCallInfo.Item1.Processor,
- serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name).Replace("+", requestingTopicSessionIdPart);
- if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters().Length != (inputParameters?.Length ?? 0))
- throw new InvalidDataException("Could not find Universal Api Service in the processor has Method Parameters count == "
- + (inputParameters?.Length ?? 0));
- var apiParameters = serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters();
- serviceOrPropertyCallResult =
- await universalApiInvoker.InvokeUniversalApiServiceAsync(
- serviceOrPropertyCallInfo.Item1.Processor,
- serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name,
- inputParameters,
- requestContext);
- }
- else
- {
- serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_PropertyReplyFormatStr,
- serviceOrPropertyCallInfo.Item1.Processor,
- serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name).Replace("+", requestingTopicSessionIdPart);
- if (inputParameters != null && inputParameters.Length >= 2)
- throw new InvalidDataException("Unexpected too many parameters passed in. when calling a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject");
- if (inputParameters == null)
- {
- /*call 'get'*/
- serviceOrPropertyCallResult =
- universalApiInvoker.InvokeUniversalApiPropertyGet(
- serviceOrPropertyCallInfo.Item1.Processor,
- serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name,
- requestContext);
- }
- else
- {
- /*call 'set'*/
- JsonElement? setValue = null;
- if (inputParameters.Length == 0)
- {
- }
- else if (inputParameters.Length >= 2)
- {
- throw new InvalidDataException("Unexpected too many parameters passed in. when calling for set value for a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject");
- }
- else
- setValue = inputParameters[0];
- serviceOrPropertyCallResult =
- universalApiInvoker.InvokeUniversalApiPropertySet(
- serviceOrPropertyCallInfo.Item1.Processor,
- serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name, setValue, requestContext);
- }
- }
- }
- catch (Exception eeee)
- {
- serviceOrPropertyCallResult = new
- {
- ErrorCode = 400,
- Message = eeee.ToString()
- };
- }
- var serviceCallResultJsonBytes = JsonSerializer.SerializeToUtf8Bytes(serviceOrPropertyCallResult, this.jsonSerializerOptions);
- await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
- {
- ApplicationMessage = new MQTTnet.MqttApplicationMessage()
- {
- Topic = serviceOrPropertyReplyTopic,
- Payload = serviceCallResultJsonBytes
- }
- });
- break;
- }
- else
- {
- }
- }
- });
- await this.mqttClient.SubscribeAsync(serviceTopics.Select(t =>
- new MQTTnet.Packets.MqttTopicFilter()
- {
- // last part is the id for seperate request/response sessions
- // missing id part will also trigger the service call, but request/response are not pair guranteed.
- Topic = t// + "/+"
- }).ToList());
- await this.mqttClient.StartAsync(mqttClientOptions);
- }
- public virtual async Task<bool> RouteEventAsync(IProcessor eventSource, EventDescriptor eventDescriptor)
- {
- if (this.mqttClient == null || !this.mqttClient.IsStarted || !this.mqttClient.IsConnected)
- return false;
- try
- {
- if (this.throttledEvents.Any()
- && eventDescriptor.Data?.GetType().GetProperty("BatchId") is PropertyInfo batchIdProperty)
- {
- var batchId = batchIdProperty.GetValue(eventDescriptor.Data)?.ToString();
- if (!string.IsNullOrEmpty(batchId))
- {
- var key = eventSource.MetaConfigName + eventDescriptor.Name + batchId;
- ThrottledEvent tEvt = null;
- if (this.throttledEvents.TryGetValue(key, out tEvt))
- {
- if (DateTime.Now.Subtract(tEvt.throttleStartTime) > tEvt.throttlePeriod)
- {
- this.throttledEvents.TryRemove(key, out _);
- }
- else
- {
- //throttled
- return true;
- }
- }
- }
- }
- var processorDescription = eventSource.ProcessorDescriptor();
- var eventApiInfo = processorDescription?.UniversalApiInfos.FirstOrDefault(i => i.EventApiInfo != null && i.EventApiInfo.EventName == eventDescriptor.Name);
- //.DeviceProcessorHandlerOrAppliation.GetType().GetCustomAttributes(typeof(UniversalApiAttribute), true)?.FirstOrDefault();
- if (eventApiInfo == null) return false;
- var eventTopic = this.TopicMaker(this.Topic_EventFormatStr,
- eventSource,
- eventDescriptor.Name);
- var eventDataJsonBytes = JsonSerializer.SerializeToUtf8Bytes(eventDescriptor.Data, this.jsonSerializerOptions);
- await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
- {
- ApplicationMessage = new MQTTnet.MqttApplicationMessage()
- {
- Topic = eventTopic,
- Payload = eventDataJsonBytes
- }
- });
- return true;
- }
- catch
- {
- return false;
- }
- }
- public virtual IEnumerable<UniversalApiInfoDoc> GetApiDocuments()
- {
- //var pretty = "<ul>" + this.mqttApiDescriptions.GroupBy(api =>
- // api.Value.Item1.DeviceProcessorHandlerOrAppliation.GetType().FullName)
- // .Select(g => "<li>" + g.Key + "(api count: " + g.Count() + ") <ul>" +
- // g.Select(i => "<li>" + i.Value.Item2.Name + "</li><p>topic: " + i.Key + "</p><p>" + i.Value.Item3 + "</p>")
- // .Aggregate((acc, n) => acc + n)
- // + "</ul><li>")
- // .Aggregate((acc, n) => acc + Environment.NewLine + n) + "</ul>";
- //return pretty;
- return this.mqttApiInfos.Keys.Select(mqttApiTopic =>
- {
- var apiInfo = this.mqttApiInfos[mqttApiTopic].Item1;
- var doc = new UniversalApiInfoDoc()
- {
- ProviderType = apiInfo.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName
- //+ ", " + apiInfo.Processor.MetaConfigName
- ,
- BaseCategory = apiInfo.ServiceApiInfo == null ?
- (apiInfo.PropertyApiInfo == null ? "Event" : "Property") : "Service",
- ProviderConfigName = apiInfo.Processor.MetaConfigName,
- ApiName = this.mqttApiInfos[mqttApiTopic].Item2,
- Path = mqttApiTopic,
- Description = this.mqttApiInfos[mqttApiTopic].Item3,
- InputParametersExampleJson = apiInfo.ApiAttribute.InputParametersExampleJson //adding dynamic generate one by schema like webapi,
- //InputParameters = apiInfo.ServiceApiInfo == null ?
- // // for event, providing the EventDataType
- // (apiInfo.PropertyApiInfo == null ?
- // (apiInfo.EventApiInfo?.EventDataType?.ToString() ?? "")
- // : apiInfo.PropertyApiInfo.PropertyType.ToString()) :
- // // for service, providing the parameter list
- // apiInfo.ServiceApiInfo.GetParameters()?.Select(p => p.ParameterType.Name + " " + (p.Name ?? ""))?.Aggregate("", (acc, n) => (string.IsNullOrEmpty(acc) ? "" : (acc + ", ")) + n) ?? "",
- };
- var rawTags = new Type[] { this.mqttApiInfos[mqttApiTopic].Item1.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType() }.ExtractProcessorMetaDescriptor(true).FirstOrDefault()?.Tags;
- if (rawTags != null)
- {
- var localizedTags = new List<string>();
- foreach (var t in rawTags)
- localizedTags.Add(t.LocalizedContent("en"));
- doc.ProviderTags = localizedTags.ToArray();
- }
- string[] inputParametersJsonSchemaStrings = null;
- string outputParameterJsonSchema = null;
- if (apiInfo.ServiceApiInfo != null)
- {
- inputParametersJsonSchemaStrings = apiInfo.ServiceApiInfo.ResolveParamsJsonSchemas().ToArray();
- outputParameterJsonSchema = apiInfo.ServiceApiInfo.ReturnType.GetGenericArguments().FirstOrDefault()?.ResolveParamsJsonSchema();
- }
- else if (apiInfo.PropertyApiInfo != null)
- {
- if (!apiInfo.PropertyApiInfo.CanWrite
- || apiInfo.PropertyApiInfo.SetMethod == null
- || (apiInfo.PropertyApiInfo.SetMethod?.IsPrivate ?? false))
- {
- // indicates this Property is read-only
- inputParametersJsonSchemaStrings = null;
- }
- else
- inputParametersJsonSchemaStrings = new string[] { apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas() };
- outputParameterJsonSchema = apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas();
- }
- else if (apiInfo.EventApiInfo != null)
- {
- outputParameterJsonSchema = apiInfo.EventApiInfo.EventDataType?.ResolveParamsJsonSchema();
- }
- doc.InputParametersJsonSchemaStrings = inputParametersJsonSchemaStrings;
- doc.InputParametersExampleJson = doc.InputParametersExampleJson ?? inputParametersJsonSchemaStrings?.GenerateParametersJsonSample();
- doc.OutputParametersJsonSchema = outputParameterJsonSchema;
- doc.OutputParametersExampleJson = doc.OutputParametersExampleJson ?? outputParameterJsonSchema?.GenerateParameterJsonSample();
- return doc;
- });
- }
- public virtual void Dispose()
- {
- this.throttledEvents.Clear();
- this.mqttClient?.StopAsync();
- }
- public void Throttle(string processorMetaConfigName, string eventName, string eventBatchId, TimeSpan? period)
- {
- var key = processorMetaConfigName + eventName + eventBatchId;
- if (period == null)
- {
- this.throttledEvents.TryRemove(key, out _);
- return;
- }
- this.throttledEvents.AddOrUpdate(key, new ThrottledEvent()
- {
- EventName = eventName,
- EventBatchId = eventBatchId,
- throttlePeriod = period.Value,
- throttleStartTime = DateTime.Now
- }, (k, exist) => { exist.throttlePeriod = period.Value; exist.throttleStartTime = DateTime.Now; return exist; });
- }
- }
- }
|