ConnectionController.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. using Edge.Core.Parser.BinaryParser.Util;
  2. using Edge.Core.Processor;
  3. using Edge.Core.UniversalApi;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Logging.Abstractions;
  6. using PetroChinaOnlineWatchPlugin.MessageEntity;
  7. using PetroChinaOnlineWatchPlugin.MessageEntity.Outgoing;
  8. using System;
  9. using System.Collections.Generic;
  10. using System.Net;
  11. using System.Net.Sockets;
  12. using System.Text;
  13. namespace PetroChinaOnlineWatchPlugin
  14. {
  15. internal class ConnectionController
  16. {
  17. public int TcpListenPort { get { return 6000; } }
  18. public TcpClient TcpClientInstance { get { return tcpClient; } }
  19. public static ConnectionController Default => instance;
  20. private static readonly ConnectionController instance = new ConnectionController();
  21. private ILogger logger = NullLogger.Instance;
  22. private IProcessor source;
  23. private UniversalApiHub universalApiHub;
  24. private TcpListener tcpListener;
  25. private TcpClient tcpClient = new TcpClient();
  26. //消息长度不应大于228字节,以减小对应用缓冲区大小的要求。
  27. private byte[] tcpServerReceivedBuffer = new byte[1024];
  28. private byte[] tcpClientReceivedBuffer = new byte[1024];
  29. private AppConfigV1 appConfig;
  30. private bool hasSentConfig = false;
  31. private int environmentPushingInterval = -1;
  32. private const string OnUploadOnlineMonitorDataStateChangeEventName = "OnUploadOnlineMonitorDataStateChange";
  33. public bool Start(AppConfigV1 appConfig, ILogger logger, IProcessor source, UniversalApiHub universalApiHub)
  34. {
  35. this.appConfig = appConfig;
  36. this.logger = logger;
  37. this.source = source;
  38. this.universalApiHub = universalApiHub;
  39. HeartBeatUdpProxy.Default.OnDataReceived += Udp_DataReceived;
  40. logger.LogInformation($"TcpListener is listening on port, {TcpListenPort}");
  41. tcpListener = new TcpListener(new IPEndPoint(IPAddress.Any, TcpListenPort));
  42. tcpListener.Start();
  43. tcpListener.BeginAcceptTcpClient(AcceptCallback, null);
  44. return true;
  45. }
  46. public void SendUnsolicitedMessage()
  47. {
  48. try
  49. {
  50. if (tcpClient.Client == null || !tcpClient.Connected)
  51. {
  52. logger.LogDebug($"在线监测所有数据上传失败:无法连接到远程服务器");
  53. return;
  54. }
  55. dynamic dataList = null;
  56. var tempDic = new Dictionary<string, object>();
  57. var span = DateTime.Now - DateTime.Today;
  58. //如果配置数据无变更,每日零时和中午12点上传该配置信息。
  59. if (span.TotalSeconds < 8 || (span.TotalSeconds > 43200 && span.TotalSeconds < 43208))
  60. {
  61. if (!hasSentConfig)
  62. {
  63. dataList = SqlClientHelper.ReadOnlineMonitorData(appConfig, logger, ref tempDic, DbAddressType.Configure);
  64. if (dataList != null)
  65. {
  66. // CF_ID 01H, CF_Message
  67. var configMessage = new AnswerWriteUnsolicitedMessageOut(appConfig, 0x01, dataList);
  68. SendOnlineMonitorData("配置数据", configMessage);
  69. hasSentConfig = true;
  70. }
  71. }
  72. }
  73. else if (hasSentConfig)
  74. {
  75. hasSentConfig = false;
  76. }
  77. if (environmentPushingInterval < 0)
  78. {
  79. //环境实时,数据采集时间间隔不大于 30 秒
  80. environmentPushingInterval = 29;
  81. dataList = SqlClientHelper.ReadOnlineMonitorData(appConfig, logger, ref tempDic, DbAddressType.EnvironmentData);
  82. if (dataList != null)
  83. {
  84. var environmentRealtime = new AnswerWriteUnsolicitedMessageOut(appConfig, 0x11, dataList);
  85. SendOnlineMonitorData("环境数据", environmentRealtime);
  86. SqlClientHelper.UpdateOnlineMonitorData(appConfig, logger);
  87. }
  88. }
  89. else
  90. {
  91. environmentPushingInterval -= 1;
  92. }
  93. dataList = SqlClientHelper.ReadOnlineMonitorData(appConfig, logger, ref tempDic, DbAddressType.PumpData);
  94. if (dataList != null)
  95. {
  96. var pumpRealtimeMessage = new AnswerWriteUnsolicitedMessageOut(appConfig, 0x21, dataList);
  97. SendOnlineMonitorData("加油枪数据", pumpRealtimeMessage);
  98. SqlClientHelper.UpdateOnlineMonitorData(appConfig, logger);
  99. }
  100. dataList = SqlClientHelper.ReadOnlineMonitorData(appConfig, logger, ref tempDic, DbAddressType.FaultData);
  101. if (dataList != null)
  102. {
  103. var faultMessage = new AnswerWriteUnsolicitedMessageOut(appConfig, 0x41, dataList);
  104. SendOnlineMonitorData("报警_故障数据", faultMessage);
  105. SqlClientHelper.UpdateOnlineMonitorData(appConfig, logger);
  106. }
  107. }
  108. catch (Exception ex)
  109. {
  110. logger.LogError($"Exception in SendUnsolicitedMessage(...):\r\n{ex}");
  111. }
  112. }
  113. private void SendOnlineMonitorData(string dataTypeName, MessageBase sendMessage)
  114. {
  115. try
  116. {
  117. if (tcpClient.Client == null || !tcpClient.Connected)
  118. {
  119. logger.LogError($"{dataTypeName}上传失败:无法连接到远程服务器");
  120. return;
  121. }
  122. byte[] sendBuffer = Parser.Default.Serialize(sendMessage);
  123. logger.LogDebug($"Tcp client sent data, {sendMessage.ToString()} in SendOnlineMonitorData");
  124. tcpClient.Client.Send(sendBuffer);
  125. logger.LogDebug($"Tcp client sent data, 0x{sendBuffer.ToHexLogString()} in Udp_DataReceived");
  126. var r = universalApiHub.FireEvent(source,
  127. OnUploadOnlineMonitorDataStateChangeEventName, new UploadState() { DataTypeName = dataTypeName });
  128. }
  129. catch (Exception ex)
  130. {
  131. logger.LogDebug($"Exception in SendOnlineMonitorData(...):\r\n{ex}");
  132. tcpClient.Close();
  133. }
  134. }
  135. private void Udp_DataReceived(object sender, EventArgs ar)
  136. {
  137. try
  138. {
  139. var udpar = ar as UdpDataEventArgs;
  140. logger.LogDebug($"TcpClient connect to {udpar.IpAddress}, port {udpar.Port}");
  141. tcpClient = new TcpClient();
  142. tcpClient.Connect(udpar.IpAddress, udpar.Port);
  143. tcpClient.Client.BeginReceive(tcpClientReceivedBuffer, 0, tcpClientReceivedBuffer.Length, SocketFlags.None, TcpClientDataReceived, tcpClient);
  144. var tempDic = new Dictionary<string, object>();
  145. var dataList = SqlClientHelper.ReadOnlineMonitorData(appConfig, logger, ref tempDic, DbAddressType.Configure);
  146. if (dataList != null)
  147. {
  148. // CF_ID 01H, CF_Message
  149. var configMessage = new AnswerWriteUnsolicitedMessageOut(appConfig, 0x01, dataList);
  150. SendOnlineMonitorData("配置数据", configMessage);
  151. }
  152. }
  153. catch (Exception ex)
  154. {
  155. logger.LogError($"Exception in Udp_DataReceived(...):\r\n{ex}");
  156. }
  157. }
  158. private void TcpClientDataReceived(IAsyncResult ar)
  159. {
  160. TcpClient client = null;
  161. try
  162. {
  163. client = ar.AsyncState as TcpClient;
  164. int number = client.Client.EndReceive(ar);
  165. ar.AsyncWaitHandle.Close();
  166. var remoteEndPoint = client.Client.RemoteEndPoint as IPEndPoint;
  167. int messageLengthIndex = 6;
  168. for (int offset = 0; offset < number;)
  169. {
  170. var temp = new byte[] { tcpClientReceivedBuffer[offset + messageLengthIndex + 1], tcpClientReceivedBuffer[offset + messageLengthIndex] };
  171. offset = messageLengthIndex + 2 + BitConverter.ToUInt16(temp, 0);
  172. byte[] messageBuffer = new byte[offset];
  173. for (int i = 0; i < offset; i++)
  174. {
  175. messageBuffer[i] = tcpClientReceivedBuffer[i];
  176. }
  177. logger.LogDebug($"Tcp client received data, 0x{messageBuffer.ToHexLogString()}, from IpAddress:Port, {remoteEndPoint.Address}:{remoteEndPoint.Port}");
  178. dynamic receivedMessage = Parser.Default.Deserialize(messageBuffer);
  179. if (receivedMessage.MessageType == MessageType.IFSF_MESSAGE_TYPE_ACK)
  180. {
  181. logger.LogDebug($"Tcp client Incoming--->: {receivedMessage.ToString()}");
  182. //ACK 0x00 肯定确认,收到数据
  183. if (receivedMessage.MessageAck == 0x00)
  184. {
  185. logger.LogDebug($"SqlClientHelper ");
  186. //SqlClientHelper.UpdateOnlineMonitorData(appConfig, logger);
  187. }
  188. }
  189. }
  190. if (number > 0)
  191. {
  192. tcpClientReceivedBuffer = new byte[tcpClientReceivedBuffer.Length];
  193. client.Client.BeginReceive(tcpClientReceivedBuffer, 0, tcpClientReceivedBuffer.Length, SocketFlags.None, TcpClientDataReceived, client);
  194. }
  195. else
  196. {
  197. logger.LogDebug($"Tcp client received 0 count data which indicates the connection is broken in TcpClientDataReceived!");
  198. client.Close();
  199. }
  200. }
  201. catch (Exception ex)
  202. {
  203. logger.LogError($"Exception in TcpClientDataReceived(...):\r\n{ex}");
  204. client.Close();
  205. }
  206. }
  207. private void TcpServerDataReceived(IAsyncResult ar)
  208. {
  209. TcpClient client = null;
  210. try
  211. {
  212. client = ar.AsyncState as TcpClient;
  213. int number = client.Client.EndReceive(ar);
  214. ar.AsyncWaitHandle.Close();
  215. var remoteEndPoint = client.Client.RemoteEndPoint as IPEndPoint;
  216. logger.LogDebug($"Tcp server received {number} bytes, from IpAddress:Port, {remoteEndPoint.Address}:{remoteEndPoint.Port}");
  217. int messageLengthIndex = 6;
  218. for (int offset = 0; offset < number;)
  219. {
  220. var temp = new byte[] { tcpServerReceivedBuffer[offset + messageLengthIndex + 1], tcpServerReceivedBuffer[offset + messageLengthIndex] };
  221. offset = messageLengthIndex + 2 + BitConverter.ToUInt16(temp, 0);
  222. byte[] messageBuffer = new byte[offset];
  223. for (int i = 0; i < offset; i++)
  224. {
  225. messageBuffer[i] = tcpServerReceivedBuffer[i];
  226. }
  227. logger.LogDebug($"Tcp server received data, 0x{messageBuffer.ToHexLogString()}");
  228. dynamic receivedMessage = Parser.Default.Deserialize(messageBuffer);
  229. logger.LogDebug($"Tcp server Incoming--->: {receivedMessage.ToString()}");
  230. if (receivedMessage.MessageType == MessageType.IFSF_MESSAGE_TYPE_READ)
  231. {
  232. //控制设备读取设备的心跳间隔
  233. if (receivedMessage.DatabaseAddress[0] == 0x00 && receivedMessage.DataIdentifier[0] == 0x04)
  234. {
  235. var dataList = new List<IfsfMessageDataOut>() { new Bin8Message(0x04, 11) };
  236. if (dataList != null)
  237. {
  238. // 心跳间隔为11秒
  239. var configMessage = new AnswerWriteUnsolicitedMessageOut(appConfig, receivedMessage.MessageToken, receivedMessage.DatabaseAddress, dataList);
  240. byte[] sendBuffer = Parser.Default.Serialize(configMessage);
  241. logger.LogDebug($"Tcp server sent data, {configMessage.ToString()} in TcpServerDataReceived");
  242. client.Client.Send(sendBuffer);
  243. logger.LogDebug($"Tcp server sent data, 0x{sendBuffer.ToHexLogString()} in TcpServerDataReceived");
  244. }
  245. }
  246. }
  247. }
  248. if (number > 0)
  249. {
  250. tcpServerReceivedBuffer = new byte[tcpServerReceivedBuffer.Length];
  251. client.Client.BeginReceive(tcpServerReceivedBuffer, 0, tcpServerReceivedBuffer.Length, SocketFlags.None, TcpServerDataReceived, client);
  252. }
  253. else
  254. {
  255. logger.LogDebug($"Tcp server received 0 count data which indicates the connection is broken in TcpServerDataReceived!");
  256. client.Close();
  257. }
  258. }
  259. catch (Exception ex)
  260. {
  261. logger.LogError($"Exception in TcpServerDataReceived(...):\r\n{ex}");
  262. client.Close();
  263. }
  264. }
  265. private void AcceptCallback(IAsyncResult ar)
  266. {
  267. try
  268. {
  269. var client = tcpListener.EndAcceptTcpClient(ar);
  270. client.Client.BeginReceive(tcpServerReceivedBuffer, 0, tcpServerReceivedBuffer.Length, SocketFlags.None, TcpServerDataReceived, client);
  271. var remoteEndPoint = client.Client.RemoteEndPoint as IPEndPoint;
  272. logger.LogDebug($"Accepted a remote tcp client from IpAddress:Port, {remoteEndPoint.Address}:{remoteEndPoint.Port}");
  273. tcpListener.BeginAcceptTcpClient(AcceptCallback, null);
  274. }
  275. catch (Exception ex)
  276. {
  277. logger.LogError($"Exception in AcceptCallback(...):\r\n{ex}");
  278. }
  279. }
  280. private decimal BytesToBcd(List<byte> data)
  281. {
  282. var strBcd = new StringBuilder();
  283. int pointIndex = data[0];
  284. for (int i = 1; i < data.Count; i++)
  285. {
  286. strBcd.Append(data[i]);
  287. if (i == pointIndex)
  288. strBcd.Append('.');
  289. }
  290. return decimal.Parse(strBcd.ToString());
  291. }
  292. }
  293. public class UploadState
  294. {
  295. public string DataTypeName { get; set; }
  296. /// <summary>
  297. /// each vr watching nozzle is a physical pump nozzle, here is the bind info, will be used to draw UI.
  298. /// </summary>
  299. public int SiteLevelNozzleId { get; set; }
  300. }
  301. }