LocalMqttCommunicatorProvider.cs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. using Edge.Core.Processor;
  2. using Edge.Core.IndustryStandardInterface.Pump;
  3. using Edge.Core.UniversalApi;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using Microsoft.Extensions.Logging;
  6. using Microsoft.Extensions.Logging.Abstractions;
  7. using MQTTnet.Extensions.ManagedClient;
  8. using System;
  9. using System.Collections.Generic;
  10. using System.IO;
  11. using System.Linq;
  12. using System.Reflection;
  13. using System.Text;
  14. using System.Text.Json;
  15. using System.Text.Json.Serialization;
  16. using System.Threading.Tasks;
  17. using Microsoft.CodeAnalysis.CSharp.Syntax;
  18. using Edge.Core.Configuration;
  19. using Edge.Core.Processor.Dispatcher.Attributes;
  20. using Edge.Core.UniversalApi.CommunicationProvider;
  21. using System.Diagnostics.CodeAnalysis;
  22. using System.Collections.Concurrent;
  23. using MQTTnet.Client;
  24. using MQTTnet.Packets;
  25. namespace Edge.Core.UniversalApi
  26. {
  27. public class LocalMqttCommunicatorProvider : ICommunicationProvider, IEventThrottler
  28. {
  29. protected ILogger logger = NullLogger.Instance;
  30. //private UniversalApiInvoker universalApiInvoker;
  31. private IServiceProvider services;
  32. //private IEnumerable<IProcessor> processors;
  33. protected IManagedMqttClient mqttClient;
  34. private class ThrottledEvent
  35. {
  36. public string EventName;
  37. public string EventBatchId;
  38. public DateTime throttleStartTime;
  39. public TimeSpan throttlePeriod;
  40. }
  41. private ConcurrentDictionary<string, ThrottledEvent> throttledEvents = new ConcurrentDictionary<string, ThrottledEvent>();
  42. private JsonSerializerOptions jsonSerializerOptions;
  43. /// <summary>
  44. /// Tuple is: topic string, UniversalApiInfo, ApiName, topic usage doc string.
  45. /// </summary>
  46. private Dictionary<string, Tuple<UniversalApiInfo, string, string>> mqttApiInfos
  47. = new Dictionary<string, Tuple<UniversalApiInfo, string, string>>();
  48. internal LocalMqttCommunicatorProvider(IServiceProvider services)
  49. {
  50. this.services = services;
  51. this.jsonSerializerOptions = new JsonSerializerOptions()
  52. {
  53. WriteIndented = true,
  54. PropertyNameCaseInsensitive = true,
  55. };
  56. this.jsonSerializerOptions.Converters.Add(new JsonStringEnumConverter());
  57. }
  58. /// <summary>
  59. /// 设备属性调用, 下行, 外部调用 App or Handler 中的Universal Api Property.
  60. /// </summary>
  61. private string Topic_PropertyFormatStr
  62. = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}/+";
  63. /// <summary>
  64. /// 设备属性调用, 上行, 回复给外部调用端
  65. /// </summary>
  66. private string Topic_PropertyReplyFormatStr
  67. = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}_reply/+";
  68. /// <summary>
  69. /// 设备服务调用, 下行, 外部调用 App or Handler 中的Universal Api method.
  70. /// Local device service invoked at remote should sub the msg on this topic.
  71. /// must fill the concrete: deviceName
  72. /// </summary>
  73. private string Topic_ServiceFormatStr
  74. = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}/+";
  75. /// <summary>
  76. /// 设备服务调用, 上行, 回复给外部调用端
  77. /// </summary>
  78. private string Topic_ServiceReplyFormatStr
  79. = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}_reply/+";
  80. /// <summary>
  81. /// 设备事件上报 上行(Alink JSON)
  82. /// Local device event fire should publish the msg on this topic.
  83. /// must fill the concrete: deviceName
  84. /// </summary>
  85. private string Topic_EventFormatStr
  86. = "/{sys/}{endpointFullTypeName}/{processorName}/thing/event/{universalApi.MethodName}/post";
  87. protected virtual string TopicMaker(string topicFormatStr, IProcessor processor, string universalApiServiceOrPropertyOrEventName)
  88. {
  89. var endpointTypeMetaPartsDescriptor = processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes<MetaPartsDescriptor>().FirstOrDefault();
  90. if (endpointTypeMetaPartsDescriptor?.IsSystemInternalComponent ?? false)
  91. topicFormatStr = topicFormatStr.Replace("{sys/}", "sys/");
  92. else
  93. topicFormatStr = topicFormatStr.Replace("{sys/}", "");
  94. return topicFormatStr.Replace("{endpointFullTypeName}", processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName)
  95. .Replace("{processorName}", processor.MetaConfigName)
  96. .Replace("{universalApi.MethodName}", universalApiServiceOrPropertyOrEventName);
  97. }
  98. protected virtual ManagedMqttClientOptions GetManagedMqttClientOptions()
  99. {
  100. var localMqttServerTcpListeningPort =
  101. this.services.GetRequiredService<Configurator>().MetaConfiguration
  102. .Parameter?.FirstOrDefault(p => p.Name == "MqttServerTcpListeningPort")?.Value ?? "8388";
  103. return new ManagedMqttClientOptionsBuilder()
  104. .WithAutoReconnectDelay(TimeSpan.FromSeconds(30))
  105. .WithClientOptions(new MqttClientOptionsBuilder()
  106. .WithClientId("universalApiFccClient_local")
  107. .WithTcpServer("localhost", int.Parse(localMqttServerTcpListeningPort))
  108. // last will message is almost meaningless for local mqtt since the client and server are in same process, and in most time they start and stop at the same time.
  109. //.WithWillMessage(new MQTTnet.MqttApplicationMessage()
  110. //{
  111. // Topic = "mylastwill",
  112. // Payload = Encoding.UTF8.GetBytes("I was disconnected from local mqtt server")
  113. //})
  114. //.WithTls()
  115. .Build()).Build();
  116. }
  117. public virtual async Task SetupAsync(IEnumerable<IProcessor> processors)
  118. {
  119. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  120. this.logger = loggerFactory.CreateLogger("Main");
  121. this.mqttClient?.Dispose();
  122. this.mqttClient = this.services.GetRequiredService<IManagedMqttClient>();
  123. var mqttClientOptions = this.GetManagedMqttClientOptions();
  124. var universalApiInvoker = services.GetRequiredService<UniversalApiInvoker>();
  125. this.mqttClient.DisconnectedAsync += (arg) =>
  126. {
  127. this.logger.LogInformation($"{this.GetType().Name}, Disconnected from mqtt server, ReasonString: {arg.ReasonString ?? ""}, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}");
  128. return Task.CompletedTask;
  129. };
  130. this.mqttClient.ConnectedAsync += (arg) =>
  131. {
  132. this.logger.LogInformation($"{this.GetType().Name}, Connected to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}");
  133. return Task.CompletedTask;
  134. };
  135. this.mqttClient.ConnectingFailedAsync += (arg) =>
  136. {
  137. this.logger.LogInformation($"{this.GetType().Name}, Connecting Failed to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}");
  138. return Task.CompletedTask;
  139. };
  140. this.mqttApiInfos.Clear();
  141. var processorDescriptors = processors.Select(p => p.ProcessorDescriptor())
  142. .Where(pd => pd.UniversalApiInfos != null && pd.UniversalApiInfos.Any());
  143. List<string> serviceTopics = new List<string>();
  144. foreach (var processorDesc in processorDescriptors)
  145. {
  146. foreach (var serviceOrPropertyApiInfo in
  147. // setup Service Api here
  148. processorDesc.UniversalApiInfos.Where(i => i.ServiceApiInfo != null || i.PropertyApiInfo != null))
  149. {
  150. string subscribeTopic;
  151. if (serviceOrPropertyApiInfo.ServiceApiInfo != null)
  152. {
  153. subscribeTopic = this.TopicMaker(this.Topic_ServiceFormatStr,
  154. processorDesc.Processor,
  155. serviceOrPropertyApiInfo.ServiceApiInfo.Name);
  156. this.logger.LogDebug(this.GetType().Name + ", will sub Service stub: " + subscribeTopic);
  157. serviceTopics.Add(subscribeTopic);
  158. this.mqttApiInfos.Add(subscribeTopic,
  159. new Tuple<UniversalApiInfo, string, string>(
  160. serviceOrPropertyApiInfo,
  161. serviceOrPropertyApiInfo.ServiceApiInfo.Name,
  162. //"Publish to this topic for a Service call (replace the '+' with unique id for create calling session)</br>" +
  163. (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
  164. var publishTopic_for_serivceReply = this.TopicMaker(this.Topic_ServiceReplyFormatStr,
  165. processorDesc.Processor,
  166. serviceOrPropertyApiInfo.ServiceApiInfo.Name);
  167. this.mqttApiInfos.Add(publishTopic_for_serivceReply,
  168. new Tuple<UniversalApiInfo, string, string>(
  169. serviceOrPropertyApiInfo,
  170. serviceOrPropertyApiInfo.ServiceApiInfo.Name + "_Reply",
  171. //"Subscribe to this topic for get Service call result (replace '+' with the id in service call)")+
  172. (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
  173. }
  174. else
  175. {
  176. subscribeTopic = this.TopicMaker(this.Topic_PropertyFormatStr,
  177. processorDesc.Processor,
  178. serviceOrPropertyApiInfo.PropertyApiInfo.Name);
  179. this.logger.LogDebug(this.GetType().Name + ", will sub Property stub: " + subscribeTopic);
  180. serviceTopics.Add(subscribeTopic);
  181. this.mqttApiInfos.Add(subscribeTopic,
  182. new Tuple<UniversalApiInfo, string, string>(
  183. serviceOrPropertyApiInfo,
  184. serviceOrPropertyApiInfo.PropertyApiInfo.Name,
  185. //"Publish to this topic for a Property call (replace the '+' with unique id for create calling session)</br>" +
  186. (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
  187. var publishTopic_for_propertyReply = this.TopicMaker(this.Topic_PropertyReplyFormatStr,
  188. processorDesc.Processor,
  189. serviceOrPropertyApiInfo.PropertyApiInfo.Name);
  190. this.mqttApiInfos.Add(publishTopic_for_propertyReply,
  191. new Tuple<UniversalApiInfo, string, string>(
  192. serviceOrPropertyApiInfo,
  193. serviceOrPropertyApiInfo.PropertyApiInfo.Name + "_Reply",
  194. //"Subscribe to this topic for get Property call result (replace '+' with the id in Property call)"+
  195. (serviceOrPropertyApiInfo.ApiAttribute.Description ?? "")));
  196. }
  197. }
  198. foreach (var eventUniversalApiInfo in
  199. // setup Event Api here
  200. processorDesc.UniversalApiInfos.Where(i => i.EventApiInfo != null))
  201. {
  202. var eventTopic = this.TopicMaker(this.Topic_EventFormatStr,
  203. eventUniversalApiInfo.Processor,
  204. eventUniversalApiInfo.EventApiInfo?.EventName ?? "");
  205. this.mqttApiInfos.Add(eventTopic,
  206. new Tuple<UniversalApiInfo, string, string>(
  207. eventUniversalApiInfo,
  208. eventUniversalApiInfo.EventApiInfo?.EventName ?? "",
  209. //"Subscribe to this topic to receive an Event.")
  210. eventUniversalApiInfo.ApiAttribute.Description ?? ""
  211. ));
  212. }
  213. }
  214. this.mqttClient.ApplicationMessageReceivedAsync += (async e =>
  215. {
  216. var requestContext = new RequestContext(e);
  217. //if (this.logger.IsEnabled(LogLevel.Debug))
  218. //{
  219. // this.logger.LogDebug($"+ received msg on Topic = {e.ApplicationMessage.Topic}");
  220. // this.logger.LogDebug($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
  221. // this.logger.LogDebug($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
  222. // this.logger.LogDebug($"+ Retain = {e.ApplicationMessage.Retain}");
  223. //}
  224. //Tuple<UniversalApiInfo, string, string> serviceCallInfo;
  225. foreach (var openTopic in this.mqttApiInfos.Keys)
  226. {
  227. // incoming topic is like: /sys/FdcServerApp/Applications.FDC.FdcServerHostApp/thing/service/GetPumpsLayout/112233
  228. string requestingTopicStaticPart = e.ApplicationMessage.Topic.Substring(0, e.ApplicationMessage.Topic.LastIndexOf("/"));
  229. string requestingTopicSessionIdPart = e.ApplicationMessage.Topic.Substring(e.ApplicationMessage.Topic.LastIndexOf("/") + 1);
  230. if (openTopic.IndexOf(requestingTopicStaticPart) >= 0)
  231. {
  232. var serviceOrPropertyCallInfo = this.mqttApiInfos[openTopic];
  233. string serviceOrPropertyReplyTopic = null;
  234. object serviceOrPropertyCallResult = null;
  235. try
  236. {
  237. JsonElement[] inputParameters = null;
  238. try
  239. {
  240. if (e.ApplicationMessage.Payload == null || !e.ApplicationMessage.Payload.Any())
  241. {
  242. }
  243. else
  244. {
  245. var inputParameter = JsonSerializer.Deserialize<JsonElement>(e.ApplicationMessage.Payload);
  246. if (inputParameter.ValueKind == JsonValueKind.Array)
  247. inputParameters = inputParameter.EnumerateArray().ToArray();
  248. else if (inputParameter.ValueKind == JsonValueKind.Object)
  249. inputParameters = new JsonElement[] { inputParameter };
  250. else if (inputParameter.ValueKind == JsonValueKind.Undefined)
  251. { }
  252. else
  253. inputParameters = new JsonElement[] { inputParameter };
  254. }
  255. }
  256. catch (Exception eeee)
  257. {
  258. //serviceCallResult = new
  259. //{
  260. // ErrorCode = 400,
  261. // Message = "Could not parse incoming Mqtt msg Payload to JsonElement[]"
  262. //};
  263. throw new InvalidDataException("Could not parse incoming Mqtt msg Payload as a Json Object or Json Array");
  264. }
  265. if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo != null)
  266. {
  267. serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_ServiceReplyFormatStr,
  268. serviceOrPropertyCallInfo.Item1.Processor,
  269. serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name).Replace("+", requestingTopicSessionIdPart);
  270. if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters().Length != (inputParameters?.Length ?? 0))
  271. throw new InvalidDataException("Could not find Universal Api Service in the processor has Method Parameters count == "
  272. + (inputParameters?.Length ?? 0));
  273. var apiParameters = serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters();
  274. serviceOrPropertyCallResult =
  275. await universalApiInvoker.InvokeUniversalApiServiceAsync(
  276. serviceOrPropertyCallInfo.Item1.Processor,
  277. serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name,
  278. inputParameters,
  279. requestContext);
  280. }
  281. else
  282. {
  283. serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_PropertyReplyFormatStr,
  284. serviceOrPropertyCallInfo.Item1.Processor,
  285. serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name).Replace("+", requestingTopicSessionIdPart);
  286. if (inputParameters != null && inputParameters.Length >= 2)
  287. throw new InvalidDataException("Unexpected too many parameters passed in. when calling a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject");
  288. if (inputParameters == null)
  289. {
  290. /*call 'get'*/
  291. serviceOrPropertyCallResult =
  292. universalApiInvoker.InvokeUniversalApiPropertyGet(
  293. serviceOrPropertyCallInfo.Item1.Processor,
  294. serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name,
  295. requestContext);
  296. }
  297. else
  298. {
  299. /*call 'set'*/
  300. JsonElement? setValue = null;
  301. if (inputParameters.Length == 0)
  302. {
  303. }
  304. else if (inputParameters.Length >= 2)
  305. {
  306. throw new InvalidDataException("Unexpected too many parameters passed in. when calling for set value for a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject");
  307. }
  308. else
  309. setValue = inputParameters[0];
  310. serviceOrPropertyCallResult =
  311. universalApiInvoker.InvokeUniversalApiPropertySet(
  312. serviceOrPropertyCallInfo.Item1.Processor,
  313. serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name, setValue, requestContext);
  314. }
  315. }
  316. }
  317. catch (Exception eeee)
  318. {
  319. serviceOrPropertyCallResult = new
  320. {
  321. ErrorCode = 400,
  322. Message = eeee.ToString()
  323. };
  324. }
  325. var serviceCallResultJsonBytes = JsonSerializer.SerializeToUtf8Bytes(serviceOrPropertyCallResult, this.jsonSerializerOptions);
  326. await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
  327. {
  328. ApplicationMessage = new MQTTnet.MqttApplicationMessage()
  329. {
  330. Topic = serviceOrPropertyReplyTopic,
  331. Payload = serviceCallResultJsonBytes
  332. }
  333. });
  334. break;
  335. }
  336. else
  337. {
  338. }
  339. }
  340. });
  341. await this.mqttClient.SubscribeAsync(serviceTopics.Select(t =>
  342. new MQTTnet.Packets.MqttTopicFilter()
  343. {
  344. // last part is the id for seperate request/response sessions
  345. // missing id part will also trigger the service call, but request/response are not pair guranteed.
  346. Topic = t// + "/+"
  347. }).ToList());
  348. await this.mqttClient.StartAsync(mqttClientOptions);
  349. }
  350. public virtual async Task<bool> RouteEventAsync(IProcessor eventSource, EventDescriptor eventDescriptor)
  351. {
  352. if (this.mqttClient == null || !this.mqttClient.IsStarted || !this.mqttClient.IsConnected)
  353. return false;
  354. try
  355. {
  356. if (this.throttledEvents.Any()
  357. && eventDescriptor.Data?.GetType().GetProperty("BatchId") is PropertyInfo batchIdProperty)
  358. {
  359. var batchId = batchIdProperty.GetValue(eventDescriptor.Data)?.ToString();
  360. if (!string.IsNullOrEmpty(batchId))
  361. {
  362. var key = eventSource.MetaConfigName + eventDescriptor.Name + batchId;
  363. ThrottledEvent tEvt = null;
  364. if (this.throttledEvents.TryGetValue(key, out tEvt))
  365. {
  366. if (DateTime.Now.Subtract(tEvt.throttleStartTime) > tEvt.throttlePeriod)
  367. {
  368. this.throttledEvents.TryRemove(key, out _);
  369. }
  370. else
  371. {
  372. //throttled
  373. return true;
  374. }
  375. }
  376. }
  377. }
  378. var processorDescription = eventSource.ProcessorDescriptor();
  379. var eventApiInfo = processorDescription?.UniversalApiInfos.FirstOrDefault(i => i.EventApiInfo != null && i.EventApiInfo.EventName == eventDescriptor.Name);
  380. //.DeviceProcessorHandlerOrAppliation.GetType().GetCustomAttributes(typeof(UniversalApiAttribute), true)?.FirstOrDefault();
  381. if (eventApiInfo == null) return false;
  382. var eventTopic = this.TopicMaker(this.Topic_EventFormatStr,
  383. eventSource,
  384. eventDescriptor.Name);
  385. var eventDataJsonBytes = JsonSerializer.SerializeToUtf8Bytes(eventDescriptor.Data, this.jsonSerializerOptions);
  386. await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
  387. {
  388. ApplicationMessage = new MQTTnet.MqttApplicationMessage()
  389. {
  390. Topic = eventTopic,
  391. Payload = eventDataJsonBytes
  392. }
  393. });
  394. return true;
  395. }
  396. catch
  397. {
  398. return false;
  399. }
  400. }
  401. public virtual IEnumerable<UniversalApiInfoDoc> GetApiDocuments()
  402. {
  403. //var pretty = "<ul>" + this.mqttApiDescriptions.GroupBy(api =>
  404. // api.Value.Item1.DeviceProcessorHandlerOrAppliation.GetType().FullName)
  405. // .Select(g => "<li>" + g.Key + "(api count: " + g.Count() + ") <ul>" +
  406. // g.Select(i => "<li>" + i.Value.Item2.Name + "</li><p>topic: " + i.Key + "</p><p>" + i.Value.Item3 + "</p>")
  407. // .Aggregate((acc, n) => acc + n)
  408. // + "</ul><li>")
  409. // .Aggregate((acc, n) => acc + Environment.NewLine + n) + "</ul>";
  410. //return pretty;
  411. return this.mqttApiInfos.Keys.Select(mqttApiTopic =>
  412. {
  413. var apiInfo = this.mqttApiInfos[mqttApiTopic].Item1;
  414. var doc = new UniversalApiInfoDoc()
  415. {
  416. ProviderType = apiInfo.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName
  417. //+ ", " + apiInfo.Processor.MetaConfigName
  418. ,
  419. BaseCategory = apiInfo.ServiceApiInfo == null ?
  420. (apiInfo.PropertyApiInfo == null ? "Event" : "Property") : "Service",
  421. ProviderConfigName = apiInfo.Processor.MetaConfigName,
  422. ApiName = this.mqttApiInfos[mqttApiTopic].Item2,
  423. Path = mqttApiTopic,
  424. Description = this.mqttApiInfos[mqttApiTopic].Item3,
  425. InputParametersExampleJson = apiInfo.ApiAttribute.InputParametersExampleJson //adding dynamic generate one by schema like webapi,
  426. //InputParameters = apiInfo.ServiceApiInfo == null ?
  427. // // for event, providing the EventDataType
  428. // (apiInfo.PropertyApiInfo == null ?
  429. // (apiInfo.EventApiInfo?.EventDataType?.ToString() ?? "")
  430. // : apiInfo.PropertyApiInfo.PropertyType.ToString()) :
  431. // // for service, providing the parameter list
  432. // apiInfo.ServiceApiInfo.GetParameters()?.Select(p => p.ParameterType.Name + " " + (p.Name ?? ""))?.Aggregate("", (acc, n) => (string.IsNullOrEmpty(acc) ? "" : (acc + ", ")) + n) ?? "",
  433. };
  434. var rawTags = new Type[] { this.mqttApiInfos[mqttApiTopic].Item1.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType() }.ExtractProcessorMetaDescriptor(true).FirstOrDefault()?.Tags;
  435. if (rawTags != null)
  436. {
  437. var localizedTags = new List<string>();
  438. foreach (var t in rawTags)
  439. localizedTags.Add(t.LocalizedContent("en"));
  440. doc.ProviderTags = localizedTags.ToArray();
  441. }
  442. string[] inputParametersJsonSchemaStrings = null;
  443. string outputParameterJsonSchema = null;
  444. if (apiInfo.ServiceApiInfo != null)
  445. {
  446. inputParametersJsonSchemaStrings = apiInfo.ServiceApiInfo.ResolveParamsJsonSchemas().ToArray();
  447. outputParameterJsonSchema = apiInfo.ServiceApiInfo.ReturnType.GetGenericArguments().FirstOrDefault()?.ResolveParamsJsonSchema();
  448. }
  449. else if (apiInfo.PropertyApiInfo != null)
  450. {
  451. if (!apiInfo.PropertyApiInfo.CanWrite
  452. || apiInfo.PropertyApiInfo.SetMethod == null
  453. || (apiInfo.PropertyApiInfo.SetMethod?.IsPrivate ?? false))
  454. {
  455. // indicates this Property is read-only
  456. inputParametersJsonSchemaStrings = null;
  457. }
  458. else
  459. inputParametersJsonSchemaStrings = new string[] { apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas() };
  460. outputParameterJsonSchema = apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas();
  461. }
  462. else if (apiInfo.EventApiInfo != null)
  463. {
  464. outputParameterJsonSchema = apiInfo.EventApiInfo.EventDataType?.ResolveParamsJsonSchema();
  465. }
  466. doc.InputParametersJsonSchemaStrings = inputParametersJsonSchemaStrings;
  467. doc.InputParametersExampleJson = doc.InputParametersExampleJson ?? inputParametersJsonSchemaStrings?.GenerateParametersJsonSample();
  468. doc.OutputParametersJsonSchema = outputParameterJsonSchema;
  469. doc.OutputParametersExampleJson = doc.OutputParametersExampleJson ?? outputParameterJsonSchema?.GenerateParameterJsonSample();
  470. return doc;
  471. });
  472. }
  473. public virtual void Dispose()
  474. {
  475. this.throttledEvents.Clear();
  476. this.mqttClient?.StopAsync();
  477. }
  478. public void Throttle(string processorMetaConfigName, string eventName, string eventBatchId, TimeSpan? period)
  479. {
  480. var key = processorMetaConfigName + eventName + eventBatchId;
  481. if (period == null)
  482. {
  483. this.throttledEvents.TryRemove(key, out _);
  484. return;
  485. }
  486. this.throttledEvents.AddOrUpdate(key, new ThrottledEvent()
  487. {
  488. EventName = eventName,
  489. EventBatchId = eventBatchId,
  490. throttlePeriod = period.Value,
  491. throttleStartTime = DateTime.Now
  492. }, (k, exist) => { exist.throttlePeriod = period.Value; exist.throttleStartTime = DateTime.Now; return exist; });
  493. }
  494. }
  495. }