ExtMethod.cs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. using Edge.Core.IndustryStandardInterface.NetworkController;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Security.Cryptography;
  6. using System.Text;
  7. using System.Text.Json;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using static DeviceInfoToAliIotHubViaGateway.App;
  11. namespace DeviceInfoToAliIotHubViaGateway
  12. {
  13. public static class ExtMethod
  14. {
  15. public static string TopicMaker(this string topicFormatStr, string productKey, string deviceName)
  16. {
  17. return topicFormatStr.Replace("{productKey}", productKey).Replace("{deviceName}", deviceName);
  18. }
  19. public static string TopicMaker(this string topicFormatStr, string productKey, string deviceName, string eventName)
  20. {
  21. return topicFormatStr.Replace("{productKey}", productKey).Replace("{deviceName}", deviceName).Replace("{tsl.event.identifier}", eventName);
  22. }
  23. public static string SignWithHMacSHA1(this byte[] target, byte[] key)
  24. {
  25. using (HMACSHA1 hmac = new HMACSHA1(key))
  26. {
  27. var hash = hmac.ComputeHash(target);
  28. return hash.Select(h => h.ToString("X2")).Aggregate((acc, n) => acc + n);
  29. }
  30. }
  31. /// <summary>
  32. /// Listen on IMqttClientNetworkController's event of OnMessageReceived, until the ->first<- message came,
  33. /// then try Deserialize this message to object T.
  34. /// or, when timedout, a default(T) will return.
  35. /// </summary>
  36. /// <typeparam name="T"></typeparam>
  37. /// <param name="client"></param>
  38. /// <param name="topic"></param>
  39. /// <param name="timeout">timedout will return a default(T)</param>
  40. /// <returns></returns>
  41. public static Task<T> WaitFor<T>(this IMqttClientNetworkController client, string topic, int timeout) where T : class
  42. {
  43. int isResponseGot = 0;
  44. var source = new TaskCompletionSource<T>();
  45. EventHandler<OnMqttMessageReceivedEventArg> callback = null;
  46. callback = (a, b) =>
  47. {
  48. if (b.Message.Topic == topic)
  49. {
  50. try
  51. {
  52. if (typeof(T) == typeof(System.String))
  53. {
  54. if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
  55. {
  56. var fff = Encoding.UTF8.GetString(b.Message.Message) as T;
  57. source.SetResult(fff);
  58. }
  59. }
  60. else
  61. {
  62. var response = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(b.Message.Message));
  63. if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
  64. {
  65. source.SetResult(response);
  66. }
  67. }
  68. }
  69. catch (Exception exx)
  70. {
  71. if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
  72. {
  73. source.SetException(exx);
  74. }
  75. }
  76. finally
  77. {
  78. client.OnMessageReceived -= callback;
  79. }
  80. }
  81. };
  82. client.OnMessageReceived += callback;
  83. var _ = new System.Timers.Timer(timeout);
  84. _.Elapsed += (__, ___) =>
  85. {
  86. _.Stop();
  87. if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
  88. {
  89. source.SetResult(default(T));
  90. }
  91. };
  92. _.Start();
  93. return source.Task;
  94. }
  95. //public static async Task<T> SubscribeAndWaitForAndUnsubscribe<T>(this IMqttClientNetworkController client, string topic,
  96. // int timeout, Mqtt_QosLevel qosLevel) where T : class
  97. //{
  98. // try
  99. // {
  100. // var subResult = await client.SubscribeAsync(0, topic, qosLevel);
  101. // if (subResult)
  102. // {
  103. // var catchUp = client.WaitFor<T>(topic, timeout);
  104. // var r = await catchUp;
  105. // await client.UnsubscribeAsync(0, topic);
  106. // return r;
  107. // }
  108. // }
  109. // catch (Exception exx)
  110. // {
  111. // try
  112. // {
  113. // await client.UnsubscribeAsync(0, topic);
  114. // }
  115. // catch { }
  116. // var source0 = new TaskCompletionSource<T>();
  117. // source0.SetException(exx);
  118. // return await source0.Task;
  119. // }
  120. // try
  121. // {
  122. // await client.UnsubscribeAsync(0, topic);
  123. // }
  124. // catch { }
  125. // var source1 = new TaskCompletionSource<T>();
  126. // source1.SetException(new InvalidOperationException("SubscribeAsync failed with returned with false"));
  127. // return await source1.Task;
  128. //}
  129. /// <summary>
  130. /// Subscribes on a topic until the ->first<- message came, and then try Deserialize this message to object T.
  131. /// finally, will Unsubscribe from topic.
  132. /// or, when timedout, a default(T) will return.
  133. /// </summary>
  134. /// <typeparam name="T"></typeparam>
  135. /// <param name="client"></param>
  136. /// <param name="topic"></param>
  137. /// <param name="timeout">timedout will return a default(T)</param>
  138. /// <returns></returns>
  139. public static async Task<T> SubscribeAndWaitForThenUnsubscribe<T>(this IMqttClientNetworkController client, string topic,
  140. int timeout, Mqtt_QosLevel qosLevel) where T : class
  141. {
  142. try
  143. {
  144. var subResult = await client.SubscribeAsync(0, topic, qosLevel);
  145. if (subResult)
  146. return await client.WaitFor<T>(topic, timeout);
  147. else
  148. {
  149. var source1 = new TaskCompletionSource<T>();
  150. source1.SetException(new InvalidOperationException("SubscribeAsync failed with returned with false"));
  151. return await source1.Task;
  152. }
  153. }
  154. catch (Exception exx)
  155. {
  156. var source0 = new TaskCompletionSource<T>();
  157. source0.SetException(exx);
  158. return await source0.Task;
  159. }
  160. finally
  161. {
  162. await client.UnsubscribeAsync(0, topic);
  163. }
  164. }
  165. public static string WrapToPropertyChangedInLocalJson(this string propertyName, string value, int requestId)
  166. {
  167. return new List<KeyValuePair<string, object>>(){
  168. new KeyValuePair<string, object>(propertyName,value)}.WrapToPropertiesChangedInLocalJson(requestId);
  169. }
  170. //public static string WrapToPropertiesChangedInLocalJson(this IEnumerable<KeyValuePair<string, object>> propertyNameAndValues, int requestId)
  171. //{
  172. // var request = "{ \"id\": \"" + requestId + "\", \"version\": \"1.0\", \"params\": {";
  173. // foreach (var nv in propertyNameAndValues)
  174. // {
  175. // string property_CurState_JsonStr = "\"" + nv.Key + "\": {\"value\": \"" + nv.Value + "\",\"time\": " + DateTime.Now.Ticks + "}";
  176. // request += property_CurState_JsonStr + ",";
  177. // }
  178. // // remove last ,
  179. // request = request.Substring(0, request.Length - 1);
  180. // request += "}, \"method\": \"thing.event.property.post\"}";
  181. // return request;
  182. //}
  183. public static string WrapToPropertiesChangedInLocalJson(this IEnumerable<KeyValuePair<string, object>> propertyNameAndValues, int requestId)
  184. {
  185. var request = "{ \"id\": \"" + requestId + "\", \"version\": \"1.0\", \"params\": {";
  186. foreach (var nv in propertyNameAndValues)
  187. {
  188. string variableValue = nv.Value.ToString();
  189. if (nv.Value.GetType() == typeof(System.String))
  190. variableValue = "\"" + variableValue + "\"";
  191. string property_CurState_JsonStr = "\"" + nv.Key + "\": {\"value\": " + variableValue + ",\"time\": " + DateTime.Now.Ticks + "}";
  192. request += property_CurState_JsonStr + ",";
  193. }
  194. // remove last ,
  195. request = request.Substring(0, request.Length - 1);
  196. request += "}, \"method\": \"thing.event.property.post\"}";
  197. return request;
  198. }
  199. }
  200. }