using App.Shared.DeviceRPC;
using Applications.FDC;
using Edge.Core.Processor;
using Edge.Core.IndustryStandardInterface.Pump;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Wayne.FDCPOSLibrary;
using Edge.Core.IndustryStandardInterface.NetworkController;
using Microsoft.Extensions.Logging.Abstractions;
using System.Text.Json;
namespace DeviceInfoToAliIotHubViaGateway
{
///
/// https://help.aliyun.com/document_detail/73734.html
/// 使用子设备动态注册的认证方式,在控制台打开动态注册开关,预注册子设备的DeviceName。
/// 由网关代替子设备进行注册,云端校验子设备DeviceName,校验通过后,动态下发DeviceSecret。
/// 然后子设备通过设备证书(ProductKey、DeviceName和DeviceSecret)接入物联网平台。
/// By using this App which acting as a gateway to delegate multiple products and deivces in a single Mqtt connection.
///
public class App : IAppProcessor
{
public string MetaConfigName { get; set; }
///
/// 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内,无特殊要求以及关联。
///
//public static string AliIotHub_LocalClientId = "liteFccCore873645";
//public static string AliIotHub_GatewayProductKey = "a19OTbmuC7g";
//public static string AliIotHub_GatewayDeviceName = "xxEdgeGatewayDevice0";
//public static string AliIotHub_GatewayDeviceSecret = "DbauniPezDmAbNg3hNWP2qe0SM9rIlkr";
//public static string AliIotHub_PumpProductKey = "a19yWoc0Glx";
#region 网关与子设备,子设备的动态注册 https://help.aliyun.com/document_detail/73734.html
/*
* 设备身份注册 https://help.aliyun.com/document_detail/89298.html
*
*/
///
/// 设备上线之前您需要对设备进行身份注册,标识您的设备
/// 子设备的动态注册, 上行,请求Topic
///
private string dynamicDeviceReg_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/sub/register";
///
/// 子设备的动态注册, 上行,响应Topic
///
private string dynamicDeviceReg_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/sub/register_reply";
///
///添加设备拓扑关系, 上行,请求Topic
///
private string dynamicDeviceTopoAdd_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add";
///
/// 添加设备拓扑关系, 上行,响应Topic
///
private string dynamicDeviceTopoAdd_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add_reply";
///
/// 删除设备的拓扑关系, 上行,请求Topic
///
private string dynamicDeviceTopoDelete_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/delete";
///
/// 删除设备的拓扑关系, 上行,响应Topic
///
private string dynamicDeviceTopoDelete_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/delete_reply";
///
/// 获取设备的拓扑关系, 上行,请求Topic
///
private string dynamicDeviceTopoGet_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/get";
///
/// 获取设备的拓扑关系, 上行,响应Topic
///
private string dynamicDeviceTopoGet_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/get_reply";
///
/// 发现设备列表上报, 上行,请求Topic
///
private string dynamicDeviceListFound_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/list/found";
///
/// 发现设备列表上报, 上行,响应Topic
///
private string dynamicDeviceListFound_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/list/found_reply";
///
/// 通知网关添加设备拓扑关系, 上行,请求Topic
/// 通知网关设备对子设备发起添加拓扑关系,可以配合发现设备列表上报功能使用。可以通过数据流转获取设备返回的结果
///
private string dynamicDeviceTopoAddNotify_Request_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add/notify";
///
/// 通知网关添加设备拓扑关系, 上行,响应Topic
/// 通知网关设备对子设备发起添加拓扑关系,可以配合发现设备列表上报功能使用。可以通过数据流转获取设备返回的结果
///
private string dynamicDeviceTopoAddNotify_Response_FormatStr = "/sys/{productKey}/{deviceName}/thing/topo/add/notify_reply";
///
/// 子设备上线, 上行,请求Topic
/// 子设备上下线消息,只支持QoS=0,不支持QoS=1。
/// 子设备上线之前,需在物联网平台为子设备注册身份,建立子设备与网关的拓扑关系。
/// 子设备上线时,物联网平台会根据拓扑关系进行子设备身份校验,以确定子设备是否具备使用网关通道的能力。
///
/// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。
/// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。
/// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。
///
private string dynamicDeviceOnline_Request_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/login";
///
/// 子设备上线, 上行,请求Topic
/// 子设备上下线消息,只支持QoS=0,不支持QoS=1。
///
/// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。
/// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。
/// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。
///
private string dynamicDeviceOnline_Response_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/login_reply";
///
/// 子设备下线, 上行,请求Topic
/// 子设备上下线消息,只支持QoS=0,不支持QoS=1。
/// 子设备上线之前,需在物联网平台为子设备注册身份,建立子设备与网关的拓扑关系。
/// 子设备上线时,物联网平台会根据拓扑关系进行子设备身份校验,以确定子设备是否具备使用网关通道的能力。
///
/// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。
/// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。
/// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。
///
private string dynamicDeviceOffline_Request_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/logout";
///
/// 子设备下线, 上行,请求Topic
/// 子设备上下线消息,只支持QoS=0,不支持QoS=1。
///
/// 因为子设备通过网关通道与物联网平台通信,以上Topic为网关设备的Topic。
/// Topic中变量 ${productKey}和 ${deviceName}需替换为网关设备的对应信息。
/// 消息体中,参数 productKey和 deviceName的值是子设备的对应信息。
///
private string dynamicDeviceOffline_Response_FormatStr = "/ext/session/{productKey}/{deviceName}/combine/logout_reply";
#endregion
///
/// productKey:deviceName:deviceSecret
///
//internal static List AliIotHub_Pump_DeviceInfos
// = new List()
// {
// new Device3Tuple(AliIotHub_PumpProductKey,"Pump1","dK3iVP4kFS8NGNVpawlEEJnKahDt2YBw"),
// new Device3Tuple(AliIotHub_PumpProductKey,"Pump2","mfCtJkAlD4iEoSq4Qg6C7ZXjHTkoGyID"),
// };
//public static string AliIotHub_PumpDeviceSecret = "DbauniPezDmAbNg3hNWP2qe0SM9rIlkr";
#region device specific topic, for Alink JSON
///
/// 设备上报属性 上行(Alink JSON)
/// Local device property change at local should publish the msg on this topic.
/// must fill the concrete: deviceName
///
private string TopicPropertySetInLocalFormatStr = "/sys/{productKey}/{deviceName}/thing/event/property/post";
///
/// 设备上报属性 上行(Alink JSON)云端的回复
///
private string TopicPropertySetInLocalReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/event/property/post_reply";
///
/// 设置设备属性 下行(Alink JSON)
/// Local device property change at remote should sub the msg on this topic.
/// must fill the concrete: deviceName
///
private string TopicPropertySetInRemoteFormatStr = "/sys/{productKey}/{deviceName}/thing/service/property/set";
///
/// 设置设备属性 下行(Alink JSON)回复给云端
///
private string TopicPropertySetInRemoteReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/service/property/set_reply";
///
/// 设备事件上报 上行(Alink JSON)
/// Local device event fire should publish the msg on this topic.
/// must fill the concrete: deviceName
///
private string TopicEventFormatStr = "/sys/{productKey}/{deviceName}/thing/event/{tsl.event.identifier}/post";
///
/// 设备事件上报 上行(Alink JSON)云端的回复
///
private string TopicEventReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/event/{tsl.event.identifier}/post_reply";
///
/// 设备服务调用 下行(Alink JSON)
/// Local device service invoked at remote should sub the msg on this topic.
/// must fill the concrete: deviceName
///
private string TopicServiceInvokedInRemoteFormatStr = "/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}";
///
/// 设备服务调用 下行(Alink JSON)回复给云端
///
private string TopicServiceInvokedInRemoteReplyFormatStr = "/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}_reply";
#endregion
///
/// successfully online in AliIotHub device info will be kept here.
///
private List onlineDevices = new List();
///
/// 填写mqttClientId,用于MQTT的底层协议报文。
///
/// ${clientId}为设备的ID信息。可取任意值,长度在64字符以内。建议使用设备的MAC地址或SN码。
/// securemode为安全模式,TCP直连模式设置为securemode=3,TLS直连为securemode=2。
/// signmethod为算法类型,支持hmacmd5和hmacsha1。
///
//public static string AliIotHub_MqttComplexClientId = AliIotHub_LocalClientId + "|securemode=3,signmethod=hmacsha1|";
private FdcServerHostApp fdcServerHostApp = null;
private bool isStopped = false;
private bool mqttClientInitialized = false;
private ILogger logger = NullLogger.Instance;
private AppConfigV1 appConfig;
public class AppConfigV1
{
///
/// 是否开启 网关-子设备 模式, 如果开启,将先注册网关设备,其再代理注册各子设备,再上传拓扑结构。
/// 在这种模式下,请确保子设备已经预注册了, 因为在此模式下,不支持 动态注册-免预注册 子设备
///
/// 否则,各设备都将采用直连方法,可以进行 动态注册-免预注册 流程进行.
///
public bool EnableGatewayMode { get; set; }
public AliIotGatewayProductInfo GatewayProductInfo { get; set; }
public AliIotLogicalNozzleProductInfo LogicalNozzleProductInfo { get; set; }
}
public class AliIotGatewayProductInfo
{
public string DynamicGatewayDeviceName { get; set; }
public string ProductKey { get; set; }
public string ProductSecret { get; set; }
}
public class AliIotLogicalNozzleProductInfo
{
public string ProductKey { get; set; }
public string ProductSecret { get; set; }
public string DynamicDeviceNamePrefix { get; set; }
}
//public enum AliIotProductLocalTypeEnum
//{
// FCC_Gateway,
// Pump,
// ATG
//}
public App(IServiceProvider serviceProvider, AppConfigV1 appConfig)
{
var loggerFactory = serviceProvider.GetRequiredService();
this.logger = loggerFactory.CreateLogger("DynamicPrivate_DeviceInfoToAliIotHubViaGateway");
this.appConfig = appConfig;
}
Queue> publishQueue = new Queue>();
public void Init(IEnumerable processors)
{
//var iotNetWorkProcessor = processors.OfType>().FirstOrDefault();
//if (iotNetWorkProcessor == null) throw new ArgumentException("Could not find IotNetWorkProcessor(Mqtt) from processors");
//this.mqttClient = iotNetWorkProcessor.Context.Handler as IMqttClientNetworkController;
//var fastMqttClient = new FastMqttClient(this.logger);
//this.mqttClient = fastMqttClient;
//AliIotHubMqttClientInitializer.Default.MqttClient = this.mqttClient;
//this.mqttClient.OnNetworkControllerStateChange += async (a, b) =>
//{
// this.logger.LogInformation("mqttClient.OnNetworkControllerStateChange to " + b.NewState);
//};
//this.mqttClient.OnMessageReceived += (a, b) =>
//{
// // sample of IOT hub cloud side call a service provided by local device.
// //{"method":"thing.service.property.set","id":"12345","version":"1.0","params":{"prop_float":123.452, "prop_int16":333, "prop_bool":1}}
// this.logger.LogTrace("mqttClient OnMessageReceived on topic: " + b.Message.Topic
// + ", msgId: " + b.Message.MessageId
// + ", msg string: " + Encoding.UTF8.GetString(b.Message.Message));
//};
this.fdcServerHostApp = processors.OfType().FirstOrDefault();
if (this.fdcServerHostApp == null)
throw new ArgumentNullException("Can't find the FdcServerHostApp from processors");
int propertyChangeRequestIdCounter = 1;
this.fdcServerHostApp.OnStateChange += async (s, a) =>
{
if (!this.mqttClientInitialized) return;
if (this.isStopped) return;
var pump = s as IFdcPumpController;
foreach (var nozzle in pump.Nozzles)
{
var nozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == pump.PumpId && n.NozzleLogicalId == nozzle.LogicalId);
if (nozzleExtraInfo?.SiteLevelNozzleId == null) continue;
var localDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + nozzleExtraInfo.SiteLevelNozzleId;
AliIotHubMqttClientInitializer initor;
if (!this.dynamicRegisteredDeviceInfos.TryGetValue(localDeviceName, out initor))
{
continue;
}
if (initor.MqttClient.QueryStatusAsync().Result != NetworkState.NetworkConnected)
continue;
//var registedRemoteDevice = this.onlineDevices.FirstOrDefault(d => d.deviceName == localDeviceName);
//if (registedRemoteDevice == null)
//{
// this.logger.LogInformation(" fdcServerHostApp.OnStateChange, could not map local device: " + localDeviceName + " to remote Ali device, will skip report local property change");
// return;
//}
//var request = "CurState".WrapToPropertyChangedInLocalJson(a.NewState.ToString(), propertyChangeRequestIdCounter++);
//this.logger.LogDebug("Pump: " + pump.PumpId + " state changed to: " + a.NewPumpState + ", publishing event...");
var request_设备上报属性 = new Alink_PropertyChanging_FromLocal()
{
id = propertyChangeRequestIdCounter++.ToString(),
Params = new Dictionary()
{
{ "LogicalId", nozzle.LogicalId },
{"SiteLevelNozzleId", nozzleExtraInfo.SiteLevelNozzleId},
{"ProductName", nozzleExtraInfo.ProductName },
{"Price", (nozzle.ExpectingPriceOnFcSide / Math.Pow(10, pump.PriceDecimalDigits))??-1 },
{"PumpId", pump.PumpId },
{"CurState", (int)(a.NewPumpState) }
}
};
//var waitPropertyChangeResponseTask = this.mqttClient.SubscribeAndWaitFor(
// this.TopicPropertySetInLocalReplyFormatStr.TopicMaker(aliDevice.productKey, aliDevice.deviceName), 8000, Mqtt_QosLevel.AtMostOnce);
var pubPropertyResult = await initor.MqttClient.PublishAsync(0,
this.TopicPropertySetInLocalFormatStr.TopicMaker(this.appConfig.LogicalNozzleProductInfo.ProductKey, localDeviceName),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request_设备上报属性)),
Mqtt_QosLevel.AtMostOnce,
false);
if (!pubPropertyResult)
{
this.logger.LogInformation(" fdcServerHostApp.OnStateChange, publish property to remote for local device: " + localDeviceName + " failed, will skip report local property change");
}
//var remoteReply = await waitPropertyChangeResponseTask;
//if (remoteReply.Contains("\"code\": 200"))
//{
// this.logger.LogInformation(" remote reply with 200");
//}
var pubEventResult = await initor.MqttClient.PublishAsync(0,
this.TopicEventFormatStr.TopicMaker(this.appConfig.LogicalNozzleProductInfo.ProductKey, localDeviceName, "OnNozzleStateChanged"),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(
new Alink_EventFiring_FromLocal("OnNozzleStateChanged")
{
id = propertyChangeRequestIdCounter++.ToString(),
Params = new
{
value = new Dictionary()
{
{"CurState", (int)(a.NewPumpState) }
}
}
})),
Mqtt_QosLevel.AtMostOnce,
false);
if (!pubEventResult)
{
this.logger.LogInformation(" fdcServerHostApp.OnStateChange, publish event to remote for local device: " + localDeviceName + " failed, will skip report local event firing");
}
}
};
this.fdcServerHostApp.OnCurrentFuellingStatusChange += async (s, a) =>
{
if (!this.mqttClientInitialized) return;
if (this.isStopped) return;
var pump = s as IFdcPumpController;
foreach (var nozzle in pump.Nozzles)
{
var nozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == pump.PumpId && n.NozzleLogicalId == nozzle.LogicalId);
if (nozzleExtraInfo?.SiteLevelNozzleId == null) continue;
var localDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + nozzleExtraInfo.SiteLevelNozzleId;
AliIotHubMqttClientInitializer initor;
if (!this.dynamicRegisteredDeviceInfos.TryGetValue(localDeviceName, out initor))
{
continue;
}
if (initor.MqttClient.QueryStatusAsync().Result != NetworkState.NetworkConnected)
continue;
//var request = "CurState".WrapToPropertyChangedInLocalJson(a.NewState.ToString(), propertyChangeRequestIdCounter++);
//this.logger.LogDebug("Pump: " + pump.PumpId + " OnCurrentFuellingStatusChange, publishing event...");
var overallStateInfo = new Dictionary();
overallStateInfo.Add("PumpId", pump.PumpId);
overallStateInfo.Add("LogicalId", nozzle.LogicalId);
overallStateInfo.Add("SiteLevelNozzleId", nozzleExtraInfo.SiteLevelNozzleId);
overallStateInfo.Add("ProductName", nozzleExtraInfo.ProductName);
overallStateInfo.Add("Price", (nozzle.ExpectingPriceOnFcSide / Math.Pow(10, pump.PriceDecimalDigits)) ?? -1);
if (!a.Transaction.Finished)
{
overallStateInfo.Add("CurSaleVol", a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits));
overallStateInfo.Add("CurSaleAmt", a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits));
overallStateInfo.Add("CurState", (int)LogicalDeviceState.FDC_FUELLING);
}
else
{
overallStateInfo.Add("LastSaleVol", a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits));
overallStateInfo.Add("LastSaleAmt", a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits));
overallStateInfo.Add("TotalizerVol", (a.Transaction.VolumeTotalizer / Math.Pow(10, pump.VolumeTotalizerDecimalDigits)) ?? -1);
overallStateInfo.Add("LastSaleTime", (a.FuelingEndTime ?? DateTime.Now).ToString());
overallStateInfo.Add("CurSaleVol", 0);
overallStateInfo.Add("CurSaleAmt", 0);
overallStateInfo.Add("CurState", (int)LogicalDeviceState.FDC_READY);
}
var request = new Alink_PropertyChanging_FromLocal()
{
id = propertyChangeRequestIdCounter++.ToString(),
Params = overallStateInfo
};
//var waitPropertyChangeResponseTask = this.mqttClient.SubscribeAndWaitFor(
// this.TopicPropertySetInLocalReplyFormatStr.TopicMaker(aliDevice.productKey, aliDevice.deviceName), 8000, Mqtt_QosLevel.AtMostOnce);
var pubResult = await initor.MqttClient.PublishAsync(0,
this.TopicPropertySetInLocalFormatStr.TopicMaker(this.appConfig.LogicalNozzleProductInfo.ProductKey, localDeviceName),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)),
Mqtt_QosLevel.AtMostOnce,
false);
if (!pubResult)
{
this.logger.LogInformation(" fdcServerHostApp.OnStateChange, publish to remote for local device: " + localDeviceName + " failed, will skip report local property change");
}
}
};
}
private int isInPublishStackLoop = 0;
private int maxQueueDepth = 20;
//private async Task KickPublishQueue()
//{
// if (0 == Interlocked.CompareExchange(ref this.isInPublishStackLoop, 1, 0))
// {
// Tuple poped;
// while (true)
// {
// while (this.publishQueue.TryDequeue(out poped))
// {
// var succeed = await this.mqttClient.PublishAsync(1,
// poped.Item1,
// poped.Item2, Mqtt_QosLevel.AtLeastOnce, false);
// if (!succeed)
// {
// this.logger.LogInformation(" Publishing to topic: " + poped.Item1 + " failed, will set mqttClientInitialized to false");
// this.mqttClientInitialized = false;
// this.publishQueue.Clear();
// }
// // max stack 5 msg, and remove the old messages.
// if (this.publishQueue.Count > maxQueueDepth) for (int i = 0; i < this.publishQueue.Count - maxQueueDepth; i++) this.publishQueue.Dequeue();
// }
// Thread.Sleep(1000);
// }
// this.isInPublishStackLoop = 0;
// }
//}
//public class AliIotHubJsonFormatResolver : DefaultContractResolver
//{
// protected override string ResolvePropertyName(string propertyName)
// {
// if (propertyName == "Params")
// return propertyName.ToLower();
// return propertyName;
// }
//}
///
///
///
///
///
///
///
private async Task DynamicDeviceRegAsync(string gatewayDeviceProductKey, string gatewayDeviceName,
IEnumerable subDevices, AliIotHubMqttClientInitializer initor)
{
var waitDeviceRegResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe(
this.dynamicDeviceReg_Response_FormatStr.TopicMaker(gatewayDeviceProductKey, gatewayDeviceName), 10000, Mqtt_QosLevel.AtMostOnce);
var request = new DynamicDeviceRegRequest()
{
id = "223",
Params = subDevices.Select(p =>
new DynamicDevice2TupleParam() { productKey = p.ProductKey, deviceName = p.DeviceName }).ToList()
};
var pubResult = await initor.MqttClient.PublishAsync(0,
this.dynamicDeviceReg_Request_FormatStr.TopicMaker(gatewayDeviceProductKey, gatewayDeviceName),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)),
Mqtt_QosLevel.AtMostOnce, false);
if (!pubResult) return null;
return await waitDeviceRegResponseTask;
}
private async Task DynamicDeviceTopoAddAsync(string gatewayProductKey, string gatewayDeviceName,
IEnumerable subDevices, AliIotHubMqttClientInitializer initor)// string subDeviceProductKey, string subDeviceName, string subDeviceSecret)
{
var waitDeviceTopoAddResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe(
this.dynamicDeviceTopoAdd_Response_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), 8000, Mqtt_QosLevel.AtMostOnce);
var request = new DynamicDeviceTopoAddRequest()
{
id = "123",
Params = new List(),
};
foreach (var subDevice in subDevices)
{
var subDeviceClientId = subDevice.ProductKey + "&" + subDevice.DeviceName;
var timestamp = DateTime.Now.Ticks.ToString();
var toBeHashing = string.Format("clientId{0}deviceName{1}productKey{2}timestamp{3}",
subDeviceClientId,
subDevice.DeviceName,
subDevice.ProductKey,
timestamp);
request.Params.Add(
new DynamicDeviceTopoAddRequestParam()
{
productKey = subDevice.ProductKey,
deviceName = subDevice.DeviceName,
sign = Encoding.UTF8.GetBytes(toBeHashing).SignWithHMacSHA1(Encoding.UTF8.GetBytes(subDevice.DeviceSecret)),
timestamp = timestamp,
clientId = subDeviceClientId
});
}
var pubResult = await initor.MqttClient.PublishAsync(0,
this.dynamicDeviceTopoAdd_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)),
Mqtt_QosLevel.AtMostOnce, false);
if (!pubResult) return null;
return await waitDeviceTopoAddResponseTask;
}
///
/// 单个批次上下线的子设备数量不超过5个
///
///
///
///
///
private async Task DynamicDeviceOnlineAsync(string gatewayProductKey, string gatewayDeviceName,
DynamicDeviceRegResponseParam subDevice, AliIotHubMqttClientInitializer initor)// string subDeviceProductKey, string subDeviceName, string subDeviceSecret)
{
var waitDeviceOnlineResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe(
this.dynamicDeviceOnline_Response_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), 8000, Mqtt_QosLevel.AtMostOnce);
var subDeviceClientId = subDevice.productKey + "&" + subDevice.deviceName;
var timestamp = DateTime.Now.Ticks.ToString();
var toBeHashing = string.Format("clientId{0}deviceName{1}productKey{2}timestamp{3}",
subDeviceClientId,
subDevice.deviceName,
subDevice.productKey,
timestamp);
var request = new DynamicDeviceOnlineRequest()
{
id = "123",
Params = new DynamicDeviceOnlineRequestParam()
{
productKey = subDevice.productKey,
deviceName = subDevice.deviceName,
clientId = subDeviceClientId,
timestamp = timestamp,
sign = Encoding.UTF8.GetBytes(toBeHashing).SignWithHMacSHA1(Encoding.UTF8.GetBytes(subDevice.deviceSecret)),
}
};
var pubResult = await initor.MqttClient.PublishAsync(0,
this.dynamicDeviceOnline_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)),
Mqtt_QosLevel.AtMostOnce, false);
if (!pubResult) return null;
return await waitDeviceOnlineResponseTask;
}
private async Task DynamicDeviceOfflineAsync(string gatewayProductKey, string gatewayDeviceName,
Device3Tuple subDevice, AliIotHubMqttClientInitializer initor)// string subDeviceProductKey, string subDeviceName, string subDeviceSecret)
{
var waitDeviceOfflineResponseTask = initor.MqttClient.SubscribeAndWaitForThenUnsubscribe(
this.dynamicDeviceOffline_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName), 8000, Mqtt_QosLevel.AtMostOnce);
var request = new DynamicDeviceOfflineRequest()
{
id = "123",
Params = new DynamicDevice2TupleParam()
{
productKey = subDevice.ProductKey,
deviceName = subDevice.DeviceName,
}
};
var pubResult = await initor.MqttClient.PublishAsync(0,
this.dynamicDeviceOffline_Request_FormatStr.TopicMaker(gatewayProductKey, gatewayDeviceName),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request)),
Mqtt_QosLevel.AtMostOnce, false);
if (!pubResult) return null;
return await waitDeviceOfflineResponseTask;
}
internal class DynamicDeviceInfo
{
public string clientId { get; set; }
public string productKey { get; set; }
public string deviceName { get; set; }
public string deviceToken { get; set; }
}
public Task Start()
{
Task.Run(async () =>
{
Thread.Sleep(1000);
while (true)
{
if (!this.mqttClientInitialized)
{
this.logger.LogInformation("Initializing Mqtt client...");
try
{
if (this.appConfig.EnableGatewayMode)
{
// regist the gateway device first
var targetDynamicGatewayDeviceName = this.appConfig.GatewayProductInfo.DynamicGatewayDeviceName + "_on_shawn_laptop";
string aliMqttServerUrl = this.appConfig.GatewayProductInfo.ProductKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com";
var iotInitor = new AliIotHubMqttClientInitializer(new FastMqttClient(this.logger));
var dynamicRegistedGatewayDeviceInfo = await iotInitor.DynamicRegDeviceWithoutPreRegAsync(
aliMqttServerUrl, 1883,
"23423423shawnLaptopMAC",
this.appConfig.GatewayProductInfo.ProductKey, this.appConfig.GatewayProductInfo.ProductSecret,
targetDynamicGatewayDeviceName);
if (dynamicRegistedGatewayDeviceInfo == null)
{
this.logger.LogInformation("DynamicDeviceInitConn failed, will retry later");
await Task.Delay(5000);
continue;
}
// the close is required, should use real token to conn again.
var closeResult = await iotInitor.MqttClient.CloseAsync();
var directConnResult = await iotInitor.DynamicDeviceConnAsync(aliMqttServerUrl, 1883, dynamicRegistedGatewayDeviceInfo);
this.mqttClientInitialized = true;
this.onlineDevices = new List();
#region 子设备的动态注册,即网关FCC来帮助非智能的子设备进行云端逻辑注册
var pendingForRegistDevices = this.fdcServerHostApp.FdcPumpControllers.SelectMany(
(p, index) => p.Nozzles,
(p, nz) => new
{
Pump = p,
Nozzle = nz,
NozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == p.PumpId && n.NozzleLogicalId == nz.LogicalId)
}).Where(d => d.NozzleExtraInfo.SiteLevelNozzleId.HasValue).ToList();
DynamicDeviceRegResponse regResponse = null;
try
{
this.logger.LogInformation($"========>子设备的动态注册,请确保子设备已经预注册了,网关-子设备 不支持 动态注册-免预注册 子设备. total device count: {pendingForRegistDevices.Count()}");
regResponse = await DynamicDeviceRegAsync(this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName,
pendingForRegistDevices.Select(d =>
{
//var nozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == d.Pump.PumpId && n.NozzleLogicalId == d.Nozzle.LogicalId);
//if (nozzleExtraInfo?.SiteLevelNozzleId == null)
//{
// this.logger.LogInformation($"Nozzle with Logical Nozzle Id: {d.Nozzle.LogicalId} on Pump with PumpId: {d.Pump.PumpId} does not have site level id defined, will not regist it on cloud.");
// continue;
//}
var remoteDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + d.NozzleExtraInfo.SiteLevelNozzleId;
return new Device3Tuple(this.appConfig.LogicalNozzleProductInfo.ProductKey, remoteDeviceName, "");
}), iotInitor);// AliIotHub_Pump_DeviceInfos) ;
if (regResponse == null)
{
this.logger.LogInformation(" 子设备的动态注册 failed due to timedout or no data from remote");
return;
}
else if (regResponse.code != 200)
{
this.logger.LogInformation(" 子设备的动态注册 failed due to return code: " + regResponse.code
+ ", message: " + (regResponse.message ?? ""));
return;
}
else if (regResponse.data == null || regResponse.data.Count == 0)
{
this.logger.LogInformation(" 子设备的动态注册 failed, 请确保子设备已经 预注册 了,因为 网关-子设备 结构下 不支持对子设备进行 动态注册-免预注册");
break;
}
this.logger.LogInformation(" 子设备的动态注册 succeed");
// + this.fdcServerHostApp.FdcPumpControllers.Select(i => i.PumpId.ToString()).Aggregate((acc, n) => acc + ", " + n));
}
catch (Exception exx)
{
this.logger.LogInformation(" 子设备的动态注册 exceptioned: " + exx);
return;
}
#endregion
#region 添加设备拓扑关系
try
{
this.logger.LogInformation("========>添加设备拓扑关系");
var topoAddResponse = await DynamicDeviceTopoAddAsync(
this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName,
regResponse.data.Select(s => new Device3Tuple(s.productKey, s.deviceName, s.deviceSecret)), iotInitor);
if (topoAddResponse == null)
{
this.logger.LogInformation(" 添加设备拓扑关系 failed due to timedout or no data from remote");
return;
}
else if (topoAddResponse.code != 200)
{
this.logger.LogInformation(" 添加设备拓扑关系 failed due to return code: " + topoAddResponse.code);
return;
}
this.logger.LogInformation(" 添加设备拓扑关系 succeed for PumpDevices: "
+ regResponse.data.Select(i => i.deviceName).Aggregate((acc, n) => acc + ", " + n));
}
catch (Exception exxx)
{
this.logger.LogInformation(" 添加设备拓扑关系 exceptioned: " + exxx);
return;
}
#endregion
#region 子设备上线
try
{
this.logger.LogInformation("========>子设备上线");
foreach (var regDevice in regResponse.data)
{
//var localDeviceName = "Pump" + pump.PumpId;
//var aliDevice = regResponse.data.FirstOrDefault(d => d.deviceName == localDeviceName);
//if (aliDevice == null)
//{
// this.logger.LogInformation(" 子设备上线, could not map local device: " + localDeviceName + " to remote Ali device, will skip online this device");
// continue;
//}
var onlineResponse
= await DynamicDeviceOnlineAsync(
this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName, regDevice, iotInitor);
//new Device3Tuple(aliDevice.productKey, aliDevice.deviceName, aliDevice.deviceSecret));
if (onlineResponse == null)
{
//this.logger.LogInformation(" 子设备上线 failed for local device: " + localDeviceName
// + " due to timedout or no data from remote");
continue;
}
else if (onlineResponse.code != 200)
{
//this.logger.LogInformation(" 子设备上线 failed for local device: " + localDeviceName
// + " due to return code: " + onlineResponse.code
// + ", message: " + (onlineResponse.message ?? ""));
continue;
}
this.onlineDevices.Add(onlineResponse.data);
this.logger.LogInformation(" 子设备上线 succeed for local device: " + regDevice.deviceName);
}
}
catch (Exception exxx)
{
this.logger.LogInformation(" 子设备上线 exceptioned: " + exxx);
return;
}
#endregion
}
else
{
var pendingForRegistDevices = this.fdcServerHostApp.FdcPumpControllers.SelectMany(
(p, index) => p.Nozzles,
(p, nz) => new
{
Pump = p,
Nozzle = nz,
NozzleExtraInfo = this.fdcServerHostApp.GetNozzleExtraInfos().FirstOrDefault(n => n.PumpId == p.PumpId && n.NozzleLogicalId == nz.LogicalId)
}).Where(d => d.NozzleExtraInfo.SiteLevelNozzleId.HasValue).ToList();
foreach (var device in pendingForRegistDevices)
{
var targetDynamicDeviceName = this.appConfig.LogicalNozzleProductInfo.DynamicDeviceNamePrefix + device.NozzleExtraInfo.SiteLevelNozzleId;
string aliMqttServerUrl = this.appConfig.LogicalNozzleProductInfo.ProductKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com";
var iotInitor = new AliIotHubMqttClientInitializer(new FastMqttClient(this.logger));
var dynamicRegistedDeviceInfo = await iotInitor.DynamicRegDeviceWithoutPreRegAsync(
aliMqttServerUrl, 1883, "23423423shawnLaptopMAC",
this.appConfig.LogicalNozzleProductInfo.ProductKey,
this.appConfig.LogicalNozzleProductInfo.ProductSecret,
targetDynamicDeviceName);
if (dynamicRegistedDeviceInfo == null)
{
this.logger.LogInformation($"DynamicRegDeviceWithoutPreRegAsync failed for nozzle with sitelevelId: {device.NozzleExtraInfo.SiteLevelNozzleId}");
//await Task.Delay(5000);
continue;
}
this.dynamicRegisteredDeviceInfos.Add(targetDynamicDeviceName, iotInitor);
// the close is required, should use real token to conn again.
var closeResult = await iotInitor.MqttClient.CloseAsync();
var directConnResult = await iotInitor.DynamicDeviceConnAsync(aliMqttServerUrl, 1883, dynamicRegistedDeviceInfo);
}
this.mqttClientInitialized = true;
}
}
catch (Exception exxx)
{
logger.LogError("Initializing Mqtt client exceptioned: " + exxx);
}
}
await Task.Delay(1000);
}
});
return Task.FromResult(true);
}
private Dictionary dynamicRegisteredDeviceInfos = new Dictionary();
public async Task Stop()
{
return true;
//bool overallResult = true;
//foreach (var pump in this.fdcServerHostApp.FdcPumpControllers)
//{
// var offlineResponse = await DynamicDeviceOfflineAsync(
// this.appConfig.GatewayProductInfo.ProductKey, targetDynamicGatewayDeviceName,
// new Device3Tuple(AliIotHub_PumpProductKey, "Pump" + pump.PumpId, ""));// AliIotHub_Pump_DeviceInfos) ;
// if (offlineResponse == null)
// {
// this.logger.LogInformation(" 子设备下线 failed due to timedout or no data from remote");
// overallResult = false;
// }
//}
//return overallResult;
}
}
}