using App.Shared.DeviceRPC; using Applications.FDC; using Edge.Core.Processor; using Edge.Core.IndustryStandardInterface.Pump; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; using System.Security.Cryptography; using System.Text; using System.Threading; using System.Threading.Tasks; using Wayne.FDCPOSLibrary; using Edge.Core.IndustryStandardInterface.NetworkController; using Microsoft.Extensions.Logging.Abstractions; using System.Text.Json; namespace DeviceInfoToAliIotHubViaGateway { /// /// https://help.aliyun.com/document_detail/73734.html /// 使用子设备动态注册的认证方式,在控制台打开动态注册开关,预注册子设备的DeviceName。 /// 由网关代替子设备进行注册,云端校验子设备DeviceName,校验通过后,动态下发DeviceSecret。 /// 然后子设备通过设备证书(ProductKey、DeviceName和DeviceSecret)接入物联网平台。 /// By using this App which acting as a gateway to delegate multiple products and deivces in a single Mqtt connection. /// public class App : IAppProcessor { public string MetaConfigName { get; set; } /// /// 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内,无特殊要求以及关联。 /// //public static string AliIotHub_LocalClientId = "liteFccCore873645"; //public static string AliIotHub_GatewayProductKey = "a19OTbmuC7g"; //public static string AliIotHub_GatewayDeviceName = "xxEdgeGatewayDevice0"; //public static string AliIotHub_GatewayDeviceSecret = "DbauniPezDmAbNg3hNWP2qe0SM9rIlkr"; //public static string AliIotHub_PumpProductKey = "a19yWoc0Glx"; #region 网关与子设备,子设备的动态注册 https://help.aliyun.com/document_detail/73734.html /* * 设备身份注册 https://help.aliyun.com/document_detail/89298.html * */ /// /// 设备上线之前您需要对设备进行身份注册,标识您的设备 /// 子设备的动态注册, 上行,请求Topic /// private string dynamicDeviceReg_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/sub/register"; /// /// 子设备的动态注册, 上行,响应Topic /// private string dynamicDeviceReg_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/sub/register_reply"; /// ///添加设备拓扑关系, 上行,请求Topic /// private string dynamicDeviceTopoAdd_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add"; /// /// 添加设备拓扑关系, 上行,响应Topic /// private string dynamicDeviceTopoAdd_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add_reply"; /// /// 删除设备的拓扑关系, 上行,请求Topic /// private string dynamicDeviceTopoDelete_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/delete"; /// /// 删除设备的拓扑关系, 上行,响应Topic /// private string dynamicDeviceTopoDelete_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/delete_reply"; /// /// 获取设备的拓扑关系, 上行,请求Topic /// private string dynamicDeviceTopoGet_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/get"; /// /// 获取设备的拓扑关系, 上行,响应Topic /// private string dynamicDeviceTopoGet_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/get_reply"; /// /// 发现设备列表上报, 上行,请求Topic /// private string dynamicDeviceListFound_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/list/found"; /// /// 发现设备列表上报, 上行,响应Topic /// private string dynamicDeviceListFound_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/list/found_reply"; /// /// 通知网关添加设备拓扑关系, 上行,请求Topic /// 通知网关设备对子设备发起添加拓扑关系,可以配合发现设备列表上报功能使用。可以通过数据流转获取设备返回的结果 /// private string dynamicDeviceTopoAddNotify_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add/notify"; /// /// 通知网关添加设备拓扑关系, 上行,响应Topic /// 通知网关设备对子设备发起添加拓扑关系,可以配合发现设备列表上报功能使用。可以通过数据流转获取设备返回的结果 /// private string dynamicDeviceTopoAddNotify_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add/notify_reply"; /// /// 子设备上线, 上行,请求Topic /// 子设备上下线消息,只支持QoS=0,不支持QoS=1。 /// 子设备上线之前,需在物联网平台为子设备注册身份,建立子设备与网关的拓扑关系。 /// 子设备上线时,物联网平台会根据拓扑关系进行子设备身份校验,以确定子设备是否具备使用网关通道的能力。 /// /// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。 /// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。 /// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。 /// private string dynamicDeviceOnline_Request_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/login"; /// /// 子设备上线, 上行,请求Topic /// 子设备上下线消息,只支持QoS=0,不支持QoS=1。 /// /// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。 /// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。 /// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。 /// private string dynamicDeviceOnline_Response_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/login_reply"; /// /// 子设备下线, 上行,请求Topic /// 子设备上下线消息,只支持QoS=0,不支持QoS=1。 /// 子设备上线之前,需在物联网平台为子设备注册身份,建立子设备与网关的拓扑关系。 /// 子设备上线时,物联网平台会根据拓扑关系进行子设备身份校验,以确定子设备是否具备使用网关通道的能力。 /// /// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。 /// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。 /// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。 /// private string dynamicDeviceOffline_Request_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/logout"; /// /// 子设备下线, 上行,请求Topic /// 子设备上下线消息,只支持QoS=0,不支持QoS=1。 /// /// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。 /// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。 /// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。 /// private string dynamicDeviceOffline_Response_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/logout_reply"; #endregion /// /// productKey:deviceName:deviceSecret /// //internal static List AliIotHub_Pump_DeviceInfos // = new List() // { // new Device3Tuple(AliIotHub_PumpProductKey,"Pump1","dK3iVP4kFS8NGNVpawlEEJnKahDt2YBw"), // new Device3Tuple(AliIotHub_PumpProductKey,"Pump2","mfCtJkAlD4iEoSq4Qg6C7ZXjHTkoGyID"), // }; //public static string AliIotHub_PumpDeviceSecret = "DbauniPezDmAbNg3hNWP2qe0SM9rIlkr"; #region device specific topic, for Alink JSON /// /// 设备上报属性 上行(Alink JSON) /// Local device property change at local should publish the msg on this topic. /// must fill the concrete: deviceName /// private string TopicPropertySetInLocalFormatStr = "/sys/{productKey}/{deviceName}/thing/event/property/post"; /// /// 设备上报属性 上行(Alink JSON)云端的回复 /// private string TopicPropertySetInLocalReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/event/property/post_reply"; /// /// 设置设备属性 下行(Alink JSON) /// Local device property change at remote should sub the msg on this topic. /// must fill the concrete: deviceName /// private string TopicPropertySetInRemoteFormatStr = "/sys/{productKey}/{deviceName}/thing/service/property/set"; /// /// 设置设备属性 下行(Alink JSON)回复给云端 /// private string TopicPropertySetInRemoteReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/service/property/set_reply"; /// /// 设备事件上报 上行(Alink JSON) /// Local device event fire should publish the msg on this topic. /// must fill the concrete: deviceName /// private string TopicEventFormatStr = "/sys/{productKey}/{deviceName}/thing/event/{tsl.event.identifier}/post"; /// /// 设备事件上报 上行(Alink JSON)云端的回复 /// private string TopicEventReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/event/{tsl.event.identifier}/post_reply"; /// /// 设备服务调用 下行(Alink JSON) /// Local device service invoked at remote should sub the msg on this topic. /// must fill the concrete: deviceName /// private string TopicServiceInvokedInRemoteFormatStr = "/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}"; /// /// 设备服务调用 下行(Alink JSON)回复给云端 /// private string TopicServiceInvokedInRemoteReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}_reply"; #endregion /// /// successfully online in AliIotHub device info will be kept here. /// private List onlineDevices = new List(); /// /// 填写mqttClientId,用于MQTT的底层协议报文。 /// /// ${clientId}为设备的ID信息。可取任意值,长度在64字符以内。建议使用设备的MAC地址或SN码。 /// securemode为安全模式,TCP直连模式设置为securemode=3,TLS直连为securemode=2。 /// signmethod为算法类型,支持hmacmd5和hmacsha1。 /// //public static string AliIotHub_MqttComplexClientId = AliIotHub_LocalClientId + "|securemode=3,signmethod=hmacsha1|"; private FdcServerHostApp fdcServerHostApp = null; private bool isStopped = false; private bool mqttClientInitialized = false; private ILogger logger = NullLogger.Instance; private AppConfigV1 appConfig; public class AppConfigV1 { /// /// 是否开启 网关-子设备 模式, 如果开启,将先注册网关设备,其再代理注册各子设备,再上传拓扑结构。 /// 在这种模式下,请确保子设备已经预注册了, 因为在此模式下,不支持 动态注册-免预注册 子设备 /// /// 否则,各设备都将采用直连方法,可以进行 动态注册-免预注册 流程进行. /// public bool EnableGatewayMode { get; set; } public AliIotGatewayProductInfo GatewayProductInfo { get; set; } public AliIotLogicalNozzleProductInfo LogicalNozzleProductInfo { get; set; } } public class AliIotGatewayProductInfo { public string DynamicGatewayDeviceName { get; set; } public string ProductKey { get; set; } public string ProductSecret { get; set; } } public class AliIotLogicalNozzleProductInfo { public string ProductKey { get; set; } public string ProductSecret { get; set; } public string DynamicDeviceNamePrefix { get; set; } } //public enum AliIotProductLocalTypeEnum //{ // FCC_Gateway, // Pump, // ATG //} public App(IServiceProvider serviceProvider, AppConfigV1 appConfig) { var loggerFactory = serviceProvider.GetRequiredService(); this.logger = loggerFactory.CreateLogger("DynamicPrivate_DeviceInfoToAliIotHubViaGateway"); this.appConfig = appConfig; } Queue> publishQueue = new Queue>(); public void Init(IEnumerable processors) { //var iotNetWorkProcessor = processors.OfType>().FirstOrDefault(); //if (iotNetWorkProcessor == null) throw new ArgumentException("Could not find IotNetWorkProcessor(Mqtt) from processors"); //this.mqttClient = iotNetWorkProcessor.Context.Handler as IMqttClientNetworkController; //var fastMqttClient = new FastMqttClient(this.logger); //this.mqttClient = fastMqttClient; //AliIotHubMqttClientInitializer.Default.MqttClient = this.mqttClient; //this.mqttClient.OnNetworkControllerStateChange += async (a, b) => //{ // this.logger.LogInformation("mqttClient.OnNetworkControllerStateChange to " + b.NewState); //}; //this.mqttClient.OnMessageReceived += (a, b) => //{ // // sample of IOT hub cloud side call a service provided by local device. // //{"method":"thing.service.property.set","id":"12345","version":"1.0","params":{"prop_float":123.452, "prop_int16":333, "prop_bool":1}} // this.logger.LogTrace("mqttClient OnMessageReceived on topic: " + b.Message.Topic // + ", msgId: " + b.Message.MessageId // + ", msg string: " + Encoding.UTF8.GetString(b.Message.Message)); //}; this.fdcServerHostApp = processors.OfType().FirstOrDefault(); if (this.fdcServerHostApp == null) throw new ArgumentNullException("Can't find the FdcServerHostApp from processors"); int propertyChangeRequestIdCounter = 1; this.fdcServerHostApp.OnStateChange += async (s, a) => { if (!this.mqttClientInitialized) return; if (this.isStopped) return; var pump = s as IFdcPumpController; foreach (var nozzle in pump.Nozzles) { var nozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == pump.PumpId && n.NozzleLogicalId == nozzle.LogicalId); if (nozzleExtraInfo?.SiteLevelNozzleId == null) continue; var localDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + nozzleExtraInfo.SiteLevelNozzleId; AliIotHubMqttClientInitializer initor; if (!this.dynamicRegisteredDeviceInfos.TryGetValue(localDeviceName, out initor)) { continue; } if (initor.MqttClient.QueryStatusAsync().Result != NetworkState.NetworkConnected) continue; //var registedRemoteDevice = this.onlineDevices.FirstOrDefault(d => d.deviceName == localDeviceName); //if (registedRemoteDevice == null) //{ // this.logger.LogInformation(" fdcServerHostApp.OnStateChange, could not map local device: " + localDeviceName + " to remote Ali device, will skip report local property change"); // return; //} //var request = "CurState".WrapToPropertyChangedInLocalJson(a.NewState.ToString(), propertyChangeRequestIdCounter++); //this.logger.LogDebug("Pump: " + pump.PumpId + " state changed to: " + a.NewPumpState + ", publishing event..."); var request_设备上报属性 = new Alink_PropertyChanging_FromLocal() { id = propertyChangeRequestIdCounter++.ToString(), Params = new Dictionary() { { "LogicalId", nozzle.LogicalId }, {"SiteLevelNozzleId", nozzleExtraInfo.SiteLevelNozzleId}, {"ProductName", nozzleExtraInfo.ProductName }, {"Price", (nozzle.ExpectingPriceOnFcSide / Math.Pow(10, pump.PriceDecimalDigits))??-1 }, {"PumpId", pump.PumpId }, {"CurState", (int)(a.NewPumpState) } } }; //var waitPropertyChangeResponseTask = this.mqttClient.SubscribeAndWaitFor( // this.TopicPropertySetInLocalReplyFormatStr.TopicMaker(aliDevice.productKey, aliDevice.deviceName), 8000, Mqtt_QosLevel.AtMostOnce); var pubPropertyResult = await initor.MqttClient.PublishAsync(0, this.TopicPropertySetInLocalFormatStr.TopicMaker(this.appConfig.LogicalNozzleProductInfo.ProductKey, localDeviceName), Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request_设备上报属性)), Mqtt_QosLevel.AtMostOnce, false); if (!pubPropertyResult) { this.logger.LogInformation(" fdcServerHostApp.OnStateChange, publish property to remote for local device: " + localDeviceName + " failed, will skip report local property change"); } //var remoteReply = await waitPropertyChangeResponseTask; //if (remoteReply.Contains("\"code\": 200")) //{ // this.logger.LogInformation(" remote reply with 200"); //} var pubEventResult = await initor.MqttClient.PublishAsync(0, this.TopicEventFormatStr.TopicMaker(this.appConfig.LogicalNozzleProductInfo.ProductKey, localDeviceName, "OnNozzleStateChanged"), Encoding.UTF8.GetBytes(JsonSerializer.Serialize( new Alink_EventFiring_FromLocal("OnNozzleStateChanged") { id = propertyChangeRequestIdCounter++.ToString(), Params = new { value = new Dictionary() { {"CurState", (int)(a.NewPumpState) } } } })), Mqtt_QosLevel.AtMostOnce, false); if (!pubEventResult) { this.logger.LogInformation(" fdcServerHostApp.OnStateChange, publish event to remote for local device: " + localDeviceName + " failed, will skip report local event firing"); } } }; this.fdcServerHostApp.OnCurrentFuellingStatusChange += async (s, a) => { if (!this.mqttClientInitialized) return; if (this.isStopped) return; var pump = s as IFdcPumpController; foreach (var nozzle in pump.Nozzles) { var nozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == pump.PumpId && n.NozzleLogicalId == nozzle.LogicalId); if (nozzleExtraInfo?.SiteLevelNozzleId == null) continue; var localDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + nozzleExtraInfo.SiteLevelNozzleId; AliIotHubMqttClientInitializer initor; if (!this.dynamicRegisteredDeviceInfos.TryGetValue(localDeviceName, out initor)) { continue; } if (initor.MqttClient.QueryStatusAsync().Result != NetworkState.NetworkConnected) continue; //var request = "CurState".WrapToPropertyChangedInLocalJson(a.NewState.ToString(), propertyChangeRequestIdCounter++); //this.logger.LogDebug("Pump: " + pump.PumpId + " OnCurrentFuellingStatusChange, publishing event..."); var overallStateInfo = new Dictionary(); overallStateInfo.Add("PumpId", pump.PumpId); overallStateInfo.Add("LogicalId", nozzle.LogicalId); overallStateInfo.Add("SiteLevelNozzleId", nozzleExtraInfo.SiteLevelNozzleId); overallStateInfo.Add("ProductName", nozzleExtraInfo.ProductName); overallStateInfo.Add("Price", (nozzle.ExpectingPriceOnFcSide / Math.Pow(10, pump.PriceDecimalDigits)) ?? -1); if (!a.Transaction.Finished) { overallStateInfo.Add("CurSaleVol", a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)); overallStateInfo.Add("CurSaleAmt", a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)); overallStateInfo.Add("CurState", (int)LogicalDeviceState.FDC_FUELLING); } else { overallStateInfo.Add("LastSaleVol", a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)); overallStateInfo.Add("LastSaleAmt", a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)); overallStateInfo.Add("TotalizerVol", (a.Transaction.VolumeTotalizer / Math.Pow(10, pump.VolumeTotalizerDecimalDigits)) ?? -1); overallStateInfo.Add("LastSaleTime", (a.FuelingEndTime ?? DateTime.Now).ToString()); overallStateInfo.Add("CurSaleVol", 0); overallStateInfo.Add("CurSaleAmt", 0); overallStateInfo.Add("CurState", (int)LogicalDeviceState.FDC_READY); } var request = new Alink_PropertyChanging_FromLocal() { id = propertyChangeRequestIdCounter++.ToString(), Params = overallStateInfo }; //var waitPropertyChangeResponseTask = this.mqttClient.SubscribeAndWaitFor( // this.TopicPropertySetInLocalReplyFormatStr.TopicMaker(aliDevice.productKey, aliDevice.deviceName), 8000, Mqtt_QosLevel.AtMostOnce); var pubResult = await initor.MqttClient.PublishAsync(0, this.TopicPropertySetInLocalFormatStr.TopicMaker(this.appConfig.LogicalNozzleProductInfo.ProductKey, localDeviceName), Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)), Mqtt_QosLevel.AtMostOnce, false); if (!pubResult) { this.logger.LogInformation(" fdcServerHostApp.OnStateChange, publish to remote for local device: " + localDeviceName + " failed, will skip report local property change"); } } }; } private int isInPublishStackLoop = 0; private int maxQueueDepth = 20; //private async Task KickPublishQueue() //{ // if (0 == Interlocked.CompareExchange(ref this.isInPublishStackLoop, 1, 0)) // { // Tuple poped; // while (true) // { // while (this.publishQueue.TryDequeue(out poped)) // { // var succeed = await this.mqttClient.PublishAsync(1, // poped.Item1, // poped.Item2, Mqtt_QosLevel.AtLeastOnce, false); // if (!succeed) // { // this.logger.LogInformation(" Publishing to topic: " + poped.Item1 + " failed, will set mqttClientInitialized to false"); // this.mqttClientInitialized = false; // this.publishQueue.Clear(); // } // // max stack 5 msg, and remove the old messages. // if (this.publishQueue.Count > maxQueueDepth) for (int i = 0; i < this.publishQueue.Count - maxQueueDepth; i++) this.publishQueue.Dequeue(); // } // Thread.Sleep(1000); // } // this.isInPublishStackLoop = 0; // } //} //public class AliIotHubJsonFormatResolver : DefaultContractResolver //{ // protected override string ResolvePropertyName(string propertyName) // { // if (propertyName == "Params") // return propertyName.ToLower(); // return propertyName; // } //} /// /// /// /// /// /// /// private async Task DynamicDeviceRegAsync(string gatewayDeviceProductKey, string gatewayDeviceName, IEnumerable subDevices, AliIotHubMqttClientInitializer initor) { var waitDeviceRegResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe( this.dynamicDeviceReg_Response_FormatStr.TopicMaker(gatewayDeviceProductKey, gatewayDeviceName), 10000, Mqtt_QosLevel.AtMostOnce); var request = new DynamicDeviceRegRequest() { id = "223", Params = subDevices.Select(p => new DynamicDevice2TupleParam() { productKey = p.ProductKey, deviceName = p.DeviceName }).ToList() }; var pubResult = await initor.MqttClient.PublishAsync(0, this.dynamicDeviceReg_Request_FormatStr.TopicMaker(gatewayDeviceProductKey, gatewayDeviceName), Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)), Mqtt_QosLevel.AtMostOnce, false); if (!pubResult) return null; return await waitDeviceRegResponseTask; } private async Task DynamicDeviceTopoAddAsync(string gatewayProductKey, string gatewayDeviceName, IEnumerable subDevices, AliIotHubMqttClientInitializer initor)// string subDeviceProductKey, string subDeviceName, string subDeviceSecret) { var waitDeviceTopoAddResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe( this.dynamicDeviceTopoAdd_Response_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), 8000, Mqtt_QosLevel.AtMostOnce); var request = new DynamicDeviceTopoAddRequest() { id = "123", Params = new List(), }; foreach (var subDevice in subDevices) { var subDeviceClientId = subDevice.ProductKey + "&" + subDevice.DeviceName; var timestamp = DateTime.Now.Ticks.ToString(); var toBeHashing = string.Format("clientId{0}deviceName{1}productKey{2}timestamp{3}", subDeviceClientId, subDevice.DeviceName, subDevice.ProductKey, timestamp); request.Params.Add( new DynamicDeviceTopoAddRequestParam() { productKey = subDevice.ProductKey, deviceName = subDevice.DeviceName, sign = Encoding.UTF8.GetBytes(toBeHashing).SignWithHMacSHA1(Encoding.UTF8.GetBytes(subDevice.DeviceSecret)), timestamp = timestamp, clientId = subDeviceClientId }); } var pubResult = await initor.MqttClient.PublishAsync(0, this.dynamicDeviceTopoAdd_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)), Mqtt_QosLevel.AtMostOnce, false); if (!pubResult) return null; return await waitDeviceTopoAddResponseTask; } /// /// 单个批次上下线的子设备数量不超过5个 /// /// /// /// /// private async Task DynamicDeviceOnlineAsync(string gatewayProductKey, string gatewayDeviceName, DynamicDeviceRegResponseParam subDevice, AliIotHubMqttClientInitializer initor)// string subDeviceProductKey, string subDeviceName, string subDeviceSecret) { var waitDeviceOnlineResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe( this.dynamicDeviceOnline_Response_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), 8000, Mqtt_QosLevel.AtMostOnce); var subDeviceClientId = subDevice.productKey + "&" + subDevice.deviceName; var timestamp = DateTime.Now.Ticks.ToString(); var toBeHashing = string.Format("clientId{0}deviceName{1}productKey{2}timestamp{3}", subDeviceClientId, subDevice.deviceName, subDevice.productKey, timestamp); var request = new DynamicDeviceOnlineRequest() { id = "123", Params = new DynamicDeviceOnlineRequestParam() { productKey = subDevice.productKey, deviceName = subDevice.deviceName, clientId = subDeviceClientId, timestamp = timestamp, sign = Encoding.UTF8.GetBytes(toBeHashing).SignWithHMacSHA1(Encoding.UTF8.GetBytes(subDevice.deviceSecret)), } }; var pubResult = await initor.MqttClient.PublishAsync(0, this.dynamicDeviceOnline_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)), Mqtt_QosLevel.AtMostOnce, false); if (!pubResult) return null; return await waitDeviceOnlineResponseTask; } private async Task DynamicDeviceOfflineAsync(string gatewayProductKey, string gatewayDeviceName, Device3Tuple subDevice, AliIotHubMqttClientInitializer initor)// string subDeviceProductKey, string subDeviceName, string subDeviceSecret) { var waitDeviceOfflineResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe( this.dynamicDeviceOffline_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), 8000, Mqtt_QosLevel.AtMostOnce); var request = new DynamicDeviceOfflineRequest() { id = "123", Params = new DynamicDevice2TupleParam() { productKey = subDevice.ProductKey, deviceName = subDevice.DeviceName, } }; var pubResult = await initor.MqttClient.PublishAsync(0, this.dynamicDeviceOffline_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)), Mqtt_QosLevel.AtMostOnce, false); if (!pubResult) return null; return await waitDeviceOfflineResponseTask; } internal class DynamicDeviceInfo { public string clientId { get; set; } public string productKey { get; set; } public string deviceName { get; set; } public string deviceToken { get; set; } } public Task Start() { Task.Run(async () => { Thread.Sleep(1000); while (true) { if (!this.mqttClientInitialized) { this.logger.LogInformation("Initializing Mqtt client..."); try { if (this.appConfig.EnableGatewayMode) { // regist the gateway device first var targetDynamicGatewayDeviceName = this.appConfig.GatewayProductInfo.DynamicGatewayDeviceName + "_on_shawn_laptop"; string aliMqttServerUrl = this.appConfig.GatewayProductInfo.ProductKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com"; var iotInitor = new AliIotHubMqttClientInitializer(new FastMqttClient(this.logger)); var dynamicRegistedGatewayDeviceInfo = await iotInitor.DynamicRegDeviceWithoutPreRegAsync( aliMqttServerUrl, 1883, "23423423shawnLaptopMAC", this.appConfig.GatewayProductInfo.ProductKey, this.appConfig.GatewayProductInfo.ProductSecret, targetDynamicGatewayDeviceName); if (dynamicRegistedGatewayDeviceInfo == null) { this.logger.LogInformation("DynamicDeviceInitConn failed, will retry later"); await Task.Delay(5000); continue; } // the close is required, should use real token to conn again. var closeResult = await iotInitor.MqttClient.CloseAsync(); var directConnResult = await iotInitor.DynamicDeviceConnAsync(aliMqttServerUrl, 1883, dynamicRegistedGatewayDeviceInfo); this.mqttClientInitialized = true; this.onlineDevices = new List(); #region 子设备的动态注册,即网关FCC来帮助非智能的子设备进行云端逻辑注册 var pendingForRegistDevices = this.fdcServerHostApp.FdcPumpControllers.SelectMany( (p, index) => p.Nozzles, (p, nz) => new { Pump = p, Nozzle = nz, NozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == p.PumpId && n.NozzleLogicalId == nz.LogicalId) }).Where(d => d.NozzleExtraInfo.SiteLevelNozzleId.HasValue).ToList(); DynamicDeviceRegResponse regResponse = null; try { this.logger.LogInformation($"========>子设备的动态注册,请确保子设备已经预注册了,网关-子设备 不支持 动态注册-免预注册 子设备. total device count: {pendingForRegistDevices.Count()}"); regResponse = await DynamicDeviceRegAsync(this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName, pendingForRegistDevices.Select(d => { //var nozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == d.Pump.PumpId && n.NozzleLogicalId == d.Nozzle.LogicalId); //if (nozzleExtraInfo?.SiteLevelNozzleId == null) //{ // this.logger.LogInformation($"Nozzle with Logical Nozzle Id: {d.Nozzle.LogicalId} on Pump with PumpId: {d.Pump.PumpId} does not have site level id defined, will not regist it on cloud."); // continue; //} var remoteDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + d.NozzleExtraInfo.SiteLevelNozzleId; return new Device3Tuple(this.appConfig.LogicalNozzleProductInfo.ProductKey, remoteDeviceName, ""); }), iotInitor);// AliIotHub_Pump_DeviceInfos) ; if (regResponse == null) { this.logger.LogInformation(" 子设备的动态注册 failed due to timedout or no data from remote"); return; } else if (regResponse.code != 200) { this.logger.LogInformation(" 子设备的动态注册 failed due to return code: " + regResponse.code + ", message: " + (regResponse.message ?? "")); return; } else if (regResponse.data == null || regResponse.data.Count == 0) { this.logger.LogInformation(" 子设备的动态注册 failed, 请确保子设备已经 预注册 了,因为 网关-子设备 结构下 不支持对子设备进行 动态注册-免预注册"); break; } this.logger.LogInformation(" 子设备的动态注册 succeed"); // + this.fdcServerHostApp.FdcPumpControllers.Select(i => i.PumpId.ToString()).Aggregate((acc, n) => acc + ", " + n)); } catch (Exception exx) { this.logger.LogInformation(" 子设备的动态注册 exceptioned: " + exx); return; } #endregion #region 添加设备拓扑关系 try { this.logger.LogInformation("========>添加设备拓扑关系"); var topoAddResponse = await DynamicDeviceTopoAddAsync( this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName, regResponse.data.Select(s => new Device3Tuple(s.productKey, s.deviceName, s.deviceSecret)), iotInitor); if (topoAddResponse == null) { this.logger.LogInformation(" 添加设备拓扑关系 failed due to timedout or no data from remote"); return; } else if (topoAddResponse.code != 200) { this.logger.LogInformation(" 添加设备拓扑关系 failed due to return code: " + topoAddResponse.code); return; } this.logger.LogInformation(" 添加设备拓扑关系 succeed for PumpDevices: " + regResponse.data.Select(i => i.deviceName).Aggregate((acc, n) => acc + ", " + n)); } catch (Exception exxx) { this.logger.LogInformation(" 添加设备拓扑关系 exceptioned: " + exxx); return; } #endregion #region 子设备上线 try { this.logger.LogInformation("========>子设备上线"); foreach (var regDevice in regResponse.data) { //var localDeviceName = "Pump" + pump.PumpId; //var aliDevice = regResponse.data.FirstOrDefault(d => d.deviceName == localDeviceName); //if (aliDevice == null) //{ // this.logger.LogInformation(" 子设备上线, could not map local device: " + localDeviceName + " to remote Ali device, will skip online this device"); // continue; //} var onlineResponse = await DynamicDeviceOnlineAsync( this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName, regDevice, iotInitor); //new Device3Tuple(aliDevice.productKey, aliDevice.deviceName, aliDevice.deviceSecret)); if (onlineResponse == null) { //this.logger.LogInformation(" 子设备上线 failed for local device: " + localDeviceName // + " due to timedout or no data from remote"); continue; } else if (onlineResponse.code != 200) { //this.logger.LogInformation(" 子设备上线 failed for local device: " + localDeviceName // + " due to return code: " + onlineResponse.code // + ", message: " + (onlineResponse.message ?? "")); continue; } this.onlineDevices.Add(onlineResponse.data); this.logger.LogInformation(" 子设备上线 succeed for local device: " + regDevice.deviceName); } } catch (Exception exxx) { this.logger.LogInformation(" 子设备上线 exceptioned: " + exxx); return; } #endregion } else { var pendingForRegistDevices = this.fdcServerHostApp.FdcPumpControllers.SelectMany( (p, index) => p.Nozzles, (p, nz) => new { Pump = p, Nozzle = nz, NozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == p.PumpId && n.NozzleLogicalId == nz.LogicalId) }).Where(d => d.NozzleExtraInfo.SiteLevelNozzleId.HasValue).ToList(); foreach (var device in pendingForRegistDevices) { var targetDynamicDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + device.NozzleExtraInfo.SiteLevelNozzleId; string aliMqttServerUrl = this.appConfig.LogicalNozzleProductInfo.ProductKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com"; var iotInitor = new AliIotHubMqttClientInitializer(new FastMqttClient(this.logger)); var dynamicRegistedDeviceInfo = await iotInitor.DynamicRegDeviceWithoutPreRegAsync( aliMqttServerUrl, 1883, "23423423shawnLaptopMAC", this.appConfig.LogicalNozzleProductInfo.ProductKey, this.appConfig.LogicalNozzleProductInfo.ProductSecret, targetDynamicDeviceName); if (dynamicRegistedDeviceInfo == null) { this.logger.LogInformation($"DynamicRegDeviceWithoutPreRegAsync failed for nozzle with sitelevelId: {device.NozzleExtraInfo.SiteLevelNozzleId}"); //await Task.Delay(5000); continue; } this.dynamicRegisteredDeviceInfos.Add(targetDynamicDeviceName, iotInitor); // the close is required, should use real token to conn again. var closeResult = await iotInitor.MqttClient.CloseAsync(); var directConnResult = await iotInitor.DynamicDeviceConnAsync(aliMqttServerUrl, 1883, dynamicRegistedDeviceInfo); } this.mqttClientInitialized = true; } } catch (Exception exxx) { logger.LogError("Initializing Mqtt client exceptioned: " + exxx); } } await Task.Delay(1000); } }); return Task.FromResult(true); } private Dictionary dynamicRegisteredDeviceInfos = new Dictionary(); public async Task Stop() { return true; //bool overallResult = true; //foreach (var pump in this.fdcServerHostApp.FdcPumpControllers) //{ // var offlineResponse = await DynamicDeviceOfflineAsync( // this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName, // new Device3Tuple(AliIotHub_PumpProductKey, "Pump" + pump.PumpId, ""));// AliIotHub_Pump_DeviceInfos) ; // if (offlineResponse == null) // { // this.logger.LogInformation(" 子设备下线 failed due to timedout or no data from remote"); // overallResult = false; // } //} //return overallResult; } } }