using Edge.Core.IndustryStandardInterface.NetworkController; using System; using System.Collections.Generic; using System.Linq; using System.Security.Cryptography; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using static DeviceInfoToAliIotHubViaGateway.App; namespace DeviceInfoToAliIotHubViaGateway { public static class ExtMethod { public static string TopicMaker(this string topicFormatStr, string productKey, string deviceName) { return topicFormatStr.Replace("{productKey}", productKey).Replace("{deviceName}", deviceName); } public static string TopicMaker(this string topicFormatStr, string productKey, string deviceName, string eventName) { return topicFormatStr.Replace("{productKey}", productKey).Replace("{deviceName}", deviceName).Replace("{tsl.event.identifier}", eventName); } public static string SignWithHMacSHA1(this byte[] target, byte[] key) { using (HMACSHA1 hmac = new HMACSHA1(key)) { var hash = hmac.ComputeHash(target); return hash.Select(h => h.ToString("X2")).Aggregate((acc, n) => acc + n); } } /// <summary> /// Listen on IMqttClientNetworkController's event of OnMessageReceived, until the ->first<- message came, /// then try Deserialize this message to object T. /// or, when timedout, a default(T) will return. /// </summary> /// <typeparam name="T"></typeparam> /// <param name="client"></param> /// <param name="topic"></param> /// <param name="timeout">timedout will return a default(T)</param> /// <returns></returns> public static Task<T> WaitFor<T>(this IMqttClientNetworkController client, string topic, int timeout) where T : class { int isResponseGot = 0; var source = new TaskCompletionSource<T>(); EventHandler<OnMqttMessageReceivedEventArg> callback = null; callback = (a, b) => { if (b.Message.Topic == topic) { try { if (typeof(T) == typeof(System.String)) { if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0) { var fff = Encoding.UTF8.GetString(b.Message.Message) as T; source.SetResult(fff); } } else { var response = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(b.Message.Message)); if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0) { source.SetResult(response); } } } catch (Exception exx) { if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0) { source.SetException(exx); } } finally { client.OnMessageReceived -= callback; } } }; client.OnMessageReceived += callback; var _ = new System.Timers.Timer(timeout); _.Elapsed += (__, ___) => { _.Stop(); if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0) { source.SetResult(default(T)); } }; _.Start(); return source.Task; } //public static async Task<T> SubscribeAndWaitForAndUnsubscribe<T>(this IMqttClientNetworkController client, string topic, // int timeout, Mqtt_QosLevel qosLevel) where T : class //{ // try // { // var subResult = await client.SubscribeAsync(0, topic, qosLevel); // if (subResult) // { // var catchUp = client.WaitFor<T>(topic, timeout); // var r = await catchUp; // await client.UnsubscribeAsync(0, topic); // return r; // } // } // catch (Exception exx) // { // try // { // await client.UnsubscribeAsync(0, topic); // } // catch { } // var source0 = new TaskCompletionSource<T>(); // source0.SetException(exx); // return await source0.Task; // } // try // { // await client.UnsubscribeAsync(0, topic); // } // catch { } // var source1 = new TaskCompletionSource<T>(); // source1.SetException(new InvalidOperationException("SubscribeAsync failed with returned with false")); // return await source1.Task; //} /// <summary> /// Subscribes on a topic until the ->first<- message came, and then try Deserialize this message to object T. /// finally, will Unsubscribe from topic. /// or, when timedout, a default(T) will return. /// </summary> /// <typeparam name="T"></typeparam> /// <param name="client"></param> /// <param name="topic"></param> /// <param name="timeout">timedout will return a default(T)</param> /// <returns></returns> public static async Task<T> SubscribeAndWaitForThenUnsubscribe<T>(this IMqttClientNetworkController client, string topic, int timeout, Mqtt_QosLevel qosLevel) where T : class { try { var subResult = await client.SubscribeAsync(0, topic, qosLevel); if (subResult) return await client.WaitFor<T>(topic, timeout); else { var source1 = new TaskCompletionSource<T>(); source1.SetException(new InvalidOperationException("SubscribeAsync failed with returned with false")); return await source1.Task; } } catch (Exception exx) { var source0 = new TaskCompletionSource<T>(); source0.SetException(exx); return await source0.Task; } finally { await client.UnsubscribeAsync(0, topic); } } public static string WrapToPropertyChangedInLocalJson(this string propertyName, string value, int requestId) { return new List<KeyValuePair<string, object>>(){ new KeyValuePair<string, object>(propertyName,value)}.WrapToPropertiesChangedInLocalJson(requestId); } //public static string WrapToPropertiesChangedInLocalJson(this IEnumerable<KeyValuePair<string, object>> propertyNameAndValues, int requestId) //{ // var request = "{ \"id\": \"" + requestId + "\", \"version\": \"1.0\", \"params\": {"; // foreach (var nv in propertyNameAndValues) // { // string property_CurState_JsonStr = "\"" + nv.Key + "\": {\"value\": \"" + nv.Value + "\",\"time\": " + DateTime.Now.Ticks + "}"; // request += property_CurState_JsonStr + ","; // } // // remove last , // request = request.Substring(0, request.Length - 1); // request += "}, \"method\": \"thing.event.property.post\"}"; // return request; //} public static string WrapToPropertiesChangedInLocalJson(this IEnumerable<KeyValuePair<string, object>> propertyNameAndValues, int requestId) { var request = "{ \"id\": \"" + requestId + "\", \"version\": \"1.0\", \"params\": {"; foreach (var nv in propertyNameAndValues) { string variableValue = nv.Value.ToString(); if (nv.Value.GetType() == typeof(System.String)) variableValue = "\"" + variableValue + "\""; string property_CurState_JsonStr = "\"" + nv.Key + "\": {\"value\": " + variableValue + ",\"time\": " + DateTime.Now.Ticks + "}"; request += property_CurState_JsonStr + ","; } // remove last , request = request.Substring(0, request.Length - 1); request += "}, \"method\": \"thing.event.property.post\"}"; return request; } } }