using Edge.Core.Processor; using Edge.Core.IndustryStandardInterface.Pump; using Edge.Core.UniversalApi; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MQTTnet.Extensions.ManagedClient; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; using Microsoft.CodeAnalysis.CSharp.Syntax; using Edge.Core.Configuration; using Edge.Core.Processor.Dispatcher.Attributes; using Edge.Core.UniversalApi.CommunicationProvider; using System.Diagnostics.CodeAnalysis; using System.Collections.Concurrent; using MQTTnet.Client; using MQTTnet.Packets; namespace Edge.Core.UniversalApi { public class LocalMqttCommunicatorProvider : ICommunicationProvider, IEventThrottler { protected ILogger logger = NullLogger.Instance; //private UniversalApiInvoker universalApiInvoker; private IServiceProvider services; //private IEnumerable<IProcessor> processors; protected IManagedMqttClient mqttClient; private class ThrottledEvent { public string EventName; public string EventBatchId; public DateTime throttleStartTime; public TimeSpan throttlePeriod; } private ConcurrentDictionary<string, ThrottledEvent> throttledEvents = new ConcurrentDictionary<string, ThrottledEvent>(); private JsonSerializerOptions jsonSerializerOptions; /// <summary> /// Tuple is: topic string, UniversalApiInfo, ApiName, topic usage doc string. /// </summary> private Dictionary<string, Tuple<UniversalApiInfo, string, string>> mqttApiInfos = new Dictionary<string, Tuple<UniversalApiInfo, string, string>>(); internal LocalMqttCommunicatorProvider(IServiceProvider services) { this.services = services; this.jsonSerializerOptions = new JsonSerializerOptions() { WriteIndented = true, PropertyNameCaseInsensitive = true, }; this.jsonSerializerOptions.Converters.Add(new JsonStringEnumConverter()); } /// <summary> /// 设备属性调用, 下行, 外部调用 App or Handler 中的Universal Api Property. /// </summary> private string Topic_PropertyFormatStr = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}/+"; /// <summary> /// 设备属性调用, 上行, 回复给外部调用端 /// </summary> private string Topic_PropertyReplyFormatStr = "/{sys/}{endpointFullTypeName}/{processorName}/thing/property/{universalApi.MethodName}_reply/+"; /// <summary> /// 设备服务调用, 下行, 外部调用 App or Handler 中的Universal Api method. /// Local device service invoked at remote should sub the msg on this topic. /// must fill the concrete: deviceName /// </summary> private string Topic_ServiceFormatStr = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}/+"; /// <summary> /// 设备服务调用, 上行, 回复给外部调用端 /// </summary> private string Topic_ServiceReplyFormatStr = "/{sys/}{endpointFullTypeName}/{processorName}/thing/service/{universalApi.MethodName}_reply/+"; /// <summary> /// 设备事件上报 上行(Alink JSON) /// Local device event fire should publish the msg on this topic. /// must fill the concrete: deviceName /// </summary> private string Topic_EventFormatStr = "/{sys/}{endpointFullTypeName}/{processorName}/thing/event/{universalApi.MethodName}/post"; protected virtual string TopicMaker(string topicFormatStr, IProcessor processor, string universalApiServiceOrPropertyOrEventName) { var endpointTypeMetaPartsDescriptor = processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes<MetaPartsDescriptor>().FirstOrDefault(); if (endpointTypeMetaPartsDescriptor?.IsSystemInternalComponent ?? false) topicFormatStr = topicFormatStr.Replace("{sys/}", "sys/"); else topicFormatStr = topicFormatStr.Replace("{sys/}", ""); return topicFormatStr.Replace("{endpointFullTypeName}", processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName) .Replace("{processorName}", processor.MetaConfigName) .Replace("{universalApi.MethodName}", universalApiServiceOrPropertyOrEventName); } protected virtual ManagedMqttClientOptions GetManagedMqttClientOptions() { var localMqttServerTcpListeningPort = this.services.GetRequiredService<Configurator>().MetaConfiguration .Parameter?.FirstOrDefault(p => p.Name == "MqttServerTcpListeningPort")?.Value ?? "8388"; return new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(30)) .WithClientOptions(new MqttClientOptionsBuilder() .WithClientId("universalApiFccClient_local") .WithTcpServer("localhost", int.Parse(localMqttServerTcpListeningPort)) // last will message is almost meaningless for local mqtt since the client and server are in same process, and in most time they start and stop at the same time. //.WithWillMessage(new MQTTnet.MqttApplicationMessage() //{ // Topic = "mylastwill", // Payload = Encoding.UTF8.GetBytes("I was disconnected from local mqtt server") //}) //.WithTls() .Build()).Build(); } public virtual async Task SetupAsync(IEnumerable<IProcessor> processors) { var loggerFactory = services.GetRequiredService<ILoggerFactory>(); this.logger = loggerFactory.CreateLogger("Main"); this.mqttClient?.Dispose(); this.mqttClient = this.services.GetRequiredService<IManagedMqttClient>(); var mqttClientOptions = this.GetManagedMqttClientOptions(); var universalApiInvoker = services.GetRequiredService<UniversalApiInvoker>(); this.mqttClient.DisconnectedAsync += (arg) => { this.logger.LogInformation($"{this.GetType().Name}, Disconnected from mqtt server, ReasonString: {arg.ReasonString ?? ""}, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}"); return Task.CompletedTask; }; this.mqttClient.ConnectedAsync += (arg) => { this.logger.LogInformation($"{this.GetType().Name}, Connected to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}"); return Task.CompletedTask; }; this.mqttClient.ConnectingFailedAsync += (arg) => { this.logger.LogInformation($"{this.GetType().Name}, Connecting Failed to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}, exception: {arg.Exception?.ToString() ?? ""}"); return Task.CompletedTask; }; this.mqttApiInfos.Clear(); var processorDescriptors = processors.Select(p => p.ProcessorDescriptor()) .Where(pd => pd.UniversalApiInfos != null && pd.UniversalApiInfos.Any()); List<string> serviceTopics = new List<string>(); foreach (var processorDesc in processorDescriptors) { foreach (var serviceOrPropertyApiInfo in // setup Service Api here processorDesc.UniversalApiInfos.Where(i => i.ServiceApiInfo != null || i.PropertyApiInfo != null)) { string subscribeTopic; if (serviceOrPropertyApiInfo.ServiceApiInfo != null) { subscribeTopic = this.TopicMaker(this.Topic_ServiceFormatStr, processorDesc.Processor, serviceOrPropertyApiInfo.ServiceApiInfo.Name); this.logger.LogDebug(this.GetType().Name + ", will sub Service stub: " + subscribeTopic); serviceTopics.Add(subscribeTopic); this.mqttApiInfos.Add(subscribeTopic, new Tuple<UniversalApiInfo, string, string>( serviceOrPropertyApiInfo, serviceOrPropertyApiInfo.ServiceApiInfo.Name, //"Publish to this topic for a Service call (replace the '+' with unique id for create calling session)</br>" + (serviceOrPropertyApiInfo.ApiAttribute.Description ?? ""))); var publishTopic_for_serivceReply = this.TopicMaker(this.Topic_ServiceReplyFormatStr, processorDesc.Processor, serviceOrPropertyApiInfo.ServiceApiInfo.Name); this.mqttApiInfos.Add(publishTopic_for_serivceReply, new Tuple<UniversalApiInfo, string, string>( serviceOrPropertyApiInfo, serviceOrPropertyApiInfo.ServiceApiInfo.Name + "_Reply", //"Subscribe to this topic for get Service call result (replace '+' with the id in service call)")+ (serviceOrPropertyApiInfo.ApiAttribute.Description ?? ""))); } else { subscribeTopic = this.TopicMaker(this.Topic_PropertyFormatStr, processorDesc.Processor, serviceOrPropertyApiInfo.PropertyApiInfo.Name); this.logger.LogDebug(this.GetType().Name + ", will sub Property stub: " + subscribeTopic); serviceTopics.Add(subscribeTopic); this.mqttApiInfos.Add(subscribeTopic, new Tuple<UniversalApiInfo, string, string>( serviceOrPropertyApiInfo, serviceOrPropertyApiInfo.PropertyApiInfo.Name, //"Publish to this topic for a Property call (replace the '+' with unique id for create calling session)</br>" + (serviceOrPropertyApiInfo.ApiAttribute.Description ?? ""))); var publishTopic_for_propertyReply = this.TopicMaker(this.Topic_PropertyReplyFormatStr, processorDesc.Processor, serviceOrPropertyApiInfo.PropertyApiInfo.Name); this.mqttApiInfos.Add(publishTopic_for_propertyReply, new Tuple<UniversalApiInfo, string, string>( serviceOrPropertyApiInfo, serviceOrPropertyApiInfo.PropertyApiInfo.Name + "_Reply", //"Subscribe to this topic for get Property call result (replace '+' with the id in Property call)"+ (serviceOrPropertyApiInfo.ApiAttribute.Description ?? ""))); } } foreach (var eventUniversalApiInfo in // setup Event Api here processorDesc.UniversalApiInfos.Where(i => i.EventApiInfo != null)) { var eventTopic = this.TopicMaker(this.Topic_EventFormatStr, eventUniversalApiInfo.Processor, eventUniversalApiInfo.EventApiInfo?.EventName ?? ""); this.mqttApiInfos.Add(eventTopic, new Tuple<UniversalApiInfo, string, string>( eventUniversalApiInfo, eventUniversalApiInfo.EventApiInfo?.EventName ?? "", //"Subscribe to this topic to receive an Event.") eventUniversalApiInfo.ApiAttribute.Description ?? "" )); } } this.mqttClient.ApplicationMessageReceivedAsync += (async e => { var requestContext = new RequestContext(e); //if (this.logger.IsEnabled(LogLevel.Debug)) //{ // this.logger.LogDebug($"+ received msg on Topic = {e.ApplicationMessage.Topic}"); // this.logger.LogDebug($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); // this.logger.LogDebug($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); // this.logger.LogDebug($"+ Retain = {e.ApplicationMessage.Retain}"); //} //Tuple<UniversalApiInfo, string, string> serviceCallInfo; foreach (var openTopic in this.mqttApiInfos.Keys) { // incoming topic is like: /sys/FdcServerApp/Applications.FDC.FdcServerHostApp/thing/service/GetPumpsLayout/112233 string requestingTopicStaticPart = e.ApplicationMessage.Topic.Substring(0, e.ApplicationMessage.Topic.LastIndexOf("/")); string requestingTopicSessionIdPart = e.ApplicationMessage.Topic.Substring(e.ApplicationMessage.Topic.LastIndexOf("/") + 1); if (openTopic.IndexOf(requestingTopicStaticPart) >= 0) { var serviceOrPropertyCallInfo = this.mqttApiInfos[openTopic]; string serviceOrPropertyReplyTopic = null; object serviceOrPropertyCallResult = null; try { JsonElement[] inputParameters = null; try { if (e.ApplicationMessage.Payload == null || !e.ApplicationMessage.Payload.Any()) { } else { var inputParameter = JsonSerializer.Deserialize<JsonElement>(e.ApplicationMessage.Payload); if (inputParameter.ValueKind == JsonValueKind.Array) inputParameters = inputParameter.EnumerateArray().ToArray(); else if (inputParameter.ValueKind == JsonValueKind.Object) inputParameters = new JsonElement[] { inputParameter }; else if (inputParameter.ValueKind == JsonValueKind.Undefined) { } else inputParameters = new JsonElement[] { inputParameter }; } } catch (Exception eeee) { //serviceCallResult = new //{ // ErrorCode = 400, // Message = "Could not parse incoming Mqtt msg Payload to JsonElement[]" //}; throw new InvalidDataException("Could not parse incoming Mqtt msg Payload as a Json Object or Json Array"); } if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo != null) { serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_ServiceReplyFormatStr, serviceOrPropertyCallInfo.Item1.Processor, serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name).Replace("+", requestingTopicSessionIdPart); if (serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters().Length != (inputParameters?.Length ?? 0)) throw new InvalidDataException("Could not find Universal Api Service in the processor has Method Parameters count == " + (inputParameters?.Length ?? 0)); var apiParameters = serviceOrPropertyCallInfo.Item1.ServiceApiInfo.GetParameters(); serviceOrPropertyCallResult = await universalApiInvoker.InvokeUniversalApiServiceAsync( serviceOrPropertyCallInfo.Item1.Processor, serviceOrPropertyCallInfo.Item1.ServiceApiInfo.Name, inputParameters, requestContext); } else { serviceOrPropertyReplyTopic = this.TopicMaker(this.Topic_PropertyReplyFormatStr, serviceOrPropertyCallInfo.Item1.Processor, serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name).Replace("+", requestingTopicSessionIdPart); if (inputParameters != null && inputParameters.Length >= 2) throw new InvalidDataException("Unexpected too many parameters passed in. when calling a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject"); if (inputParameters == null) { /*call 'get'*/ serviceOrPropertyCallResult = universalApiInvoker.InvokeUniversalApiPropertyGet( serviceOrPropertyCallInfo.Item1.Processor, serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name, requestContext); } else { /*call 'set'*/ JsonElement? setValue = null; if (inputParameters.Length == 0) { } else if (inputParameters.Length >= 2) { throw new InvalidDataException("Unexpected too many parameters passed in. when calling for set value for a Property, either post a single JsonObject, or a JsonArray with single element of JsonObject"); } else setValue = inputParameters[0]; serviceOrPropertyCallResult = universalApiInvoker.InvokeUniversalApiPropertySet( serviceOrPropertyCallInfo.Item1.Processor, serviceOrPropertyCallInfo.Item1.PropertyApiInfo.Name, setValue, requestContext); } } } catch (Exception eeee) { serviceOrPropertyCallResult = new { ErrorCode = 400, Message = eeee.ToString() }; } var serviceCallResultJsonBytes = JsonSerializer.SerializeToUtf8Bytes(serviceOrPropertyCallResult, this.jsonSerializerOptions); await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage() { ApplicationMessage = new MQTTnet.MqttApplicationMessage() { Topic = serviceOrPropertyReplyTopic, Payload = serviceCallResultJsonBytes } }); break; } else { } } }); await this.mqttClient.SubscribeAsync(serviceTopics.Select(t => new MQTTnet.Packets.MqttTopicFilter() { // last part is the id for seperate request/response sessions // missing id part will also trigger the service call, but request/response are not pair guranteed. Topic = t// + "/+" }).ToList()); await this.mqttClient.StartAsync(mqttClientOptions); } public virtual async Task<bool> RouteEventAsync(IProcessor eventSource, EventDescriptor eventDescriptor) { if (this.mqttClient == null || !this.mqttClient.IsStarted || !this.mqttClient.IsConnected) return false; try { if (this.throttledEvents.Any() && eventDescriptor.Data?.GetType().GetProperty("BatchId") is PropertyInfo batchIdProperty) { var batchId = batchIdProperty.GetValue(eventDescriptor.Data)?.ToString(); if (!string.IsNullOrEmpty(batchId)) { var key = eventSource.MetaConfigName + eventDescriptor.Name + batchId; ThrottledEvent tEvt = null; if (this.throttledEvents.TryGetValue(key, out tEvt)) { if (DateTime.Now.Subtract(tEvt.throttleStartTime) > tEvt.throttlePeriod) { this.throttledEvents.TryRemove(key, out _); } else { //throttled return true; } } } } var processorDescription = eventSource.ProcessorDescriptor(); var eventApiInfo = processorDescription?.UniversalApiInfos.FirstOrDefault(i => i.EventApiInfo != null && i.EventApiInfo.EventName == eventDescriptor.Name); //.DeviceProcessorHandlerOrAppliation.GetType().GetCustomAttributes(typeof(UniversalApiAttribute), true)?.FirstOrDefault(); if (eventApiInfo == null) return false; var eventTopic = this.TopicMaker(this.Topic_EventFormatStr, eventSource, eventDescriptor.Name); var eventDataJsonBytes = JsonSerializer.SerializeToUtf8Bytes(eventDescriptor.Data, this.jsonSerializerOptions); await this.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage() { ApplicationMessage = new MQTTnet.MqttApplicationMessage() { Topic = eventTopic, Payload = eventDataJsonBytes } }); return true; } catch { return false; } } public virtual IEnumerable<UniversalApiInfoDoc> GetApiDocuments() { //var pretty = "<ul>" + this.mqttApiDescriptions.GroupBy(api => // api.Value.Item1.DeviceProcessorHandlerOrAppliation.GetType().FullName) // .Select(g => "<li>" + g.Key + "(api count: " + g.Count() + ") <ul>" + // g.Select(i => "<li>" + i.Value.Item2.Name + "</li><p>topic: " + i.Key + "</p><p>" + i.Value.Item3 + "</p>") // .Aggregate((acc, n) => acc + n) // + "</ul><li>") // .Aggregate((acc, n) => acc + Environment.NewLine + n) + "</ul>"; //return pretty; return this.mqttApiInfos.Keys.Select(mqttApiTopic => { var apiInfo = this.mqttApiInfos[mqttApiTopic].Item1; var doc = new UniversalApiInfoDoc() { ProviderType = apiInfo.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType().FullName //+ ", " + apiInfo.Processor.MetaConfigName , BaseCategory = apiInfo.ServiceApiInfo == null ? (apiInfo.PropertyApiInfo == null ? "Event" : "Property") : "Service", ProviderConfigName = apiInfo.Processor.MetaConfigName, ApiName = this.mqttApiInfos[mqttApiTopic].Item2, Path = mqttApiTopic, Description = this.mqttApiInfos[mqttApiTopic].Item3, InputParametersExampleJson = apiInfo.ApiAttribute.InputParametersExampleJson //adding dynamic generate one by schema like webapi, //InputParameters = apiInfo.ServiceApiInfo == null ? // // for event, providing the EventDataType // (apiInfo.PropertyApiInfo == null ? // (apiInfo.EventApiInfo?.EventDataType?.ToString() ?? "") // : apiInfo.PropertyApiInfo.PropertyType.ToString()) : // // for service, providing the parameter list // apiInfo.ServiceApiInfo.GetParameters()?.Select(p => p.ParameterType.Name + " " + (p.Name ?? ""))?.Aggregate("", (acc, n) => (string.IsNullOrEmpty(acc) ? "" : (acc + ", ")) + n) ?? "", }; var rawTags = new Type[] { this.mqttApiInfos[mqttApiTopic].Item1.Processor.ProcessorDescriptor().DeviceHandlerOrApp.GetType() }.ExtractProcessorMetaDescriptor(true).FirstOrDefault()?.Tags; if (rawTags != null) { var localizedTags = new List<string>(); foreach (var t in rawTags) localizedTags.Add(t.LocalizedContent("en")); doc.ProviderTags = localizedTags.ToArray(); } string[] inputParametersJsonSchemaStrings = null; string outputParameterJsonSchema = null; if (apiInfo.ServiceApiInfo != null) { inputParametersJsonSchemaStrings = apiInfo.ServiceApiInfo.ResolveParamsJsonSchemas().ToArray(); outputParameterJsonSchema = apiInfo.ServiceApiInfo.ReturnType.GetGenericArguments().FirstOrDefault()?.ResolveParamsJsonSchema(); } else if (apiInfo.PropertyApiInfo != null) { if (!apiInfo.PropertyApiInfo.CanWrite || apiInfo.PropertyApiInfo.SetMethod == null || (apiInfo.PropertyApiInfo.SetMethod?.IsPrivate ?? false)) { // indicates this Property is read-only inputParametersJsonSchemaStrings = null; } else inputParametersJsonSchemaStrings = new string[] { apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas() }; outputParameterJsonSchema = apiInfo.PropertyApiInfo.ResolveParamsJsonSchemas(); } else if (apiInfo.EventApiInfo != null) { outputParameterJsonSchema = apiInfo.EventApiInfo.EventDataType?.ResolveParamsJsonSchema(); } doc.InputParametersJsonSchemaStrings = inputParametersJsonSchemaStrings; doc.InputParametersExampleJson = doc.InputParametersExampleJson ?? inputParametersJsonSchemaStrings?.GenerateParametersJsonSample(); doc.OutputParametersJsonSchema = outputParameterJsonSchema; doc.OutputParametersExampleJson = doc.OutputParametersExampleJson ?? outputParameterJsonSchema?.GenerateParameterJsonSample(); return doc; }); } public virtual void Dispose() { this.throttledEvents.Clear(); this.mqttClient?.StopAsync(); } public void Throttle(string processorMetaConfigName, string eventName, string eventBatchId, TimeSpan? period) { var key = processorMetaConfigName + eventName + eventBatchId; if (period == null) { this.throttledEvents.TryRemove(key, out _); return; } this.throttledEvents.AddOrUpdate(key, new ThrottledEvent() { EventName = eventName, EventBatchId = eventBatchId, throttlePeriod = period.Value, throttleStartTime = DateTime.Now }, (k, exist) => { exist.throttlePeriod = period.Value; exist.throttleStartTime = DateTime.Now; return exist; }); } } }