App.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. using Applications.FDC;
  2. using Edge.Core.Processor;using Edge.Core.IndustryStandardInterface.Pump;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.IO;
  7. using System.Linq;
  8. using System.Net.Mail;
  9. using RabbitMQ.Client;
  10. using RabbitMQ.Client.Events;
  11. using System.Text;
  12. using System.Threading.Tasks;
  13. using System.Timers;
  14. using Edge.Core.Database.Models;
  15. using Microsoft.Extensions.Configuration;
  16. using Newtonsoft.Json;
  17. using RabbitMQ.Client.Content;
  18. using RabbitMQ.Client.Exceptions;
  19. using RabbitMQ.Client.MessagePatterns;
  20. using RawRabbit;
  21. using RawRabbit.Configuration;
  22. using RawRabbit.Enrichers.GlobalExecutionId;
  23. using RawRabbit.Enrichers.MessageContext;
  24. using RawRabbit.Enrichers.MessageContext.Context;
  25. using RawRabbit.Instantiation;
  26. using Edge.Core.UniversalApi;
  27. using RawRabbit.Operations.Respond.Acknowledgement;
  28. namespace Applications.PumpInfoToRemote
  29. {
  30. public class App : IAppProcessor
  31. {
  32. private static IBusClient _client;
  33. internal static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Application");
  34. private static FdcServerHostApp fdcServerHostApp = null;
  35. public string RabbitMsgQueueConfigName { get; set; }
  36. public string IncomingRpcKey { get; set; }
  37. public string IncomingRpcExchange { get; set; }
  38. public string OutgoingRpcKey { get; set; }
  39. public string OutgoingRpcExchange { get; set; }
  40. public string sn { get; set; }
  41. public string buId { get; set; }
  42. public bool ManuallyAuthorizePump { get; set; }
  43. private bool isStopped = false;
  44. public string MetaConfigName { get; set; }
  45. private bool startedFlag = false;
  46. //private RpcClient rpcClient;
  47. /// <summary>
  48. ///
  49. /// </summary>
  50. /// <param name="buId">this business unit that the fusion belongs to</param>
  51. public App(string rabbitMsgQueueConfigName,
  52. string outgoingRpcKey,
  53. string outgoingRpcExchange,
  54. string incomingRpcKey,
  55. string incomingRpcExchange,
  56. string buId,
  57. int manuallyAuthorizePump)
  58. {
  59. RabbitMsgQueueConfigName = rabbitMsgQueueConfigName;
  60. OutgoingRpcKey = outgoingRpcKey;
  61. OutgoingRpcExchange = outgoingRpcExchange;
  62. IncomingRpcKey = incomingRpcKey;
  63. IncomingRpcExchange = incomingRpcExchange;
  64. this.buId = buId;
  65. ManuallyAuthorizePump = manuallyAuthorizePump == 1 ? true : false;
  66. }
  67. public void Init(IEnumerable<IProcessor> processors)
  68. {
  69. fdcServerHostApp = processors.OfType<FdcServerHostApp>().FirstOrDefault();
  70. if (fdcServerHostApp == null)
  71. throw new ArgumentNullException("Can't find the FdcServerHostApp from processors");
  72. _client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions
  73. {
  74. ClientConfiguration = new ConfigurationBuilder()
  75. .SetBasePath(Directory.GetCurrentDirectory())
  76. .AddJsonFile(RabbitMsgQueueConfigName)
  77. .Build()
  78. .Get<RawRabbitConfiguration>(),
  79. Plugins = p => p
  80. .UseGlobalExecutionId()
  81. .UseMessageContext<MessageContext>()
  82. });
  83. SetupLocalMqRpcServer().GetAwaiter().GetResult();
  84. //SetupLocalMqPublisher().GetAwaiter().GetResult();
  85. }
  86. private async Task SetupLocalMqPublisher()
  87. {
  88. fdcServerHostApp.OnStateChange += async (s, a) =>
  89. {
  90. if (this.isStopped) return;
  91. var pump = s as IFdcPumpController;
  92. //var notification = new NotificationRpc()
  93. //{
  94. // Method = "OnStateChange",
  95. // Params = new List<JsonRpcObjectMethodParameter>()
  96. // {
  97. // new JsonRpcObjectMethodParameter()
  98. // {
  99. // Name = "PumpId", Value = pump.PumpId.ToString()
  100. // },
  101. // new JsonRpcObjectMethodParameter()
  102. // {
  103. // Name = "NozzleId",
  104. // Value = a.StateChangedNozzles == null
  105. // ? ""
  106. // : a.StateChangedNozzles.FirstOrDefault()?.LogicalId.ToString() ?? ""
  107. // },
  108. // new JsonRpcObjectMethodParameter()
  109. // {
  110. // Name = "State", Value = a.NewPumpState.ToString()
  111. // },
  112. // new JsonRpcObjectMethodParameter()
  113. // {
  114. // Name = "BuId", Value = this.buId
  115. // },
  116. // }
  117. //};
  118. //SendToRemoteRpcServer(notification);
  119. if (!startedFlag)
  120. {
  121. startedFlag = true;
  122. await NotifyFccStarted();
  123. }
  124. };
  125. fdcServerHostApp.OnCurrentFuellingStatusChange += async (s, a) =>
  126. {
  127. if (this.isStopped) return;
  128. var pump = s as IFdcPumpController;
  129. if (a.Transaction.Finished == true)
  130. {
  131. var notification = new NotificationRpc()
  132. {
  133. Method = "OnCurrentFuellingStatusChange",
  134. Params = new List<JsonRpcObjectMethodParameter>()
  135. {
  136. new JsonRpcObjectMethodParameter()
  137. {
  138. Name = "PumpId", Value = pump.PumpId.ToString()
  139. },
  140. new JsonRpcObjectMethodParameter()
  141. {
  142. Name = "NozzleId", Value = a.Transaction.Nozzle.LogicalId.ToString()
  143. },
  144. new JsonRpcObjectMethodParameter()
  145. {
  146. Name = "State", Value = "Finished"
  147. },
  148. new JsonRpcObjectMethodParameter()
  149. {
  150. Name = "SequenceNumberGeneratedOnPhysicalPump",
  151. Value = a.Transaction.SequenceNumberGeneratedOnPhysicalPump.ToString()
  152. },
  153. new JsonRpcObjectMethodParameter()
  154. {
  155. Name = "Amount", Value = (a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)).ToString()
  156. },
  157. new JsonRpcObjectMethodParameter()
  158. {
  159. Name = "Volume", Value = (a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)).ToString()
  160. },
  161. new JsonRpcObjectMethodParameter()
  162. {
  163. Name = "Price", Value = (a.Transaction.Price / Math.Pow(10, pump.PriceDecimalDigits)).ToString()
  164. },
  165. new JsonRpcObjectMethodParameter()
  166. {
  167. Name = "VolumeTotalizer", Value = a.Transaction.VolumeTotalizer?.ToString()??"-1"
  168. },
  169. new JsonRpcObjectMethodParameter()
  170. {
  171. Name = "ReleaseToken", Value = a.ReleaseToken.ToString()
  172. },
  173. new JsonRpcObjectMethodParameter()
  174. {
  175. Name = "BuId", Value = this.buId
  176. }
  177. }
  178. };
  179. await SendToRemoteRpcServer(notification);
  180. }
  181. else if (ManuallyAuthorizePump)
  182. {
  183. var notification = new NotificationRpc()
  184. {
  185. Method = "OnCurrentFuellingStatusChange",
  186. Params = new List<JsonRpcObjectMethodParameter>()
  187. {
  188. new JsonRpcObjectMethodParameter()
  189. {
  190. Name = "PumpId", Value = pump.PumpId.ToString()
  191. },
  192. new JsonRpcObjectMethodParameter()
  193. {
  194. Name = "NozzleId", Value = a.Transaction.Nozzle.LogicalId.ToString()
  195. },
  196. new JsonRpcObjectMethodParameter()
  197. {
  198. Name = "State", Value = "Fueling"
  199. },
  200. new JsonRpcObjectMethodParameter()
  201. {
  202. Name = "Amount", Value = (a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)).ToString()
  203. },
  204. new JsonRpcObjectMethodParameter()
  205. {
  206. Name = "Volume", Value = (a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)).ToString()
  207. },
  208. new JsonRpcObjectMethodParameter()
  209. {
  210. Name = "Price", Value = (a.Transaction.Price / Math.Pow(10, pump.PriceDecimalDigits)).ToString()
  211. },
  212. new JsonRpcObjectMethodParameter()
  213. {
  214. Name = "BuId", Value = this.buId
  215. }
  216. }
  217. };
  218. await SendToRemoteRpcServer(notification);
  219. }
  220. };
  221. }
  222. private async Task SendToRemoteRpcServer(NotificationRpc notification)
  223. {
  224. string output = JsonConvert.SerializeObject(notification);
  225. logger.Info($" [x] Outgoing Request {output}");
  226. try
  227. {
  228. await _client.RequestAsync<NotificationRpc, NotificationResponse>(notification, ctx => ctx
  229. .UseRequestConfiguration(cfg => cfg
  230. .PublishRequest(p => p
  231. .OnDeclaredExchange(e => e
  232. .WithName(OutgoingRpcExchange)
  233. .WithAutoDelete())
  234. .WithRoutingKey(OutgoingRpcKey)
  235. .WithProperties(prop => prop.DeliveryMode = 1))
  236. .ConsumeResponse(r => r
  237. .Consume(c => c
  238. .WithRoutingKey($"outgoing_response_key{buId}"))
  239. .FromDeclaredQueue(q => q
  240. .WithName($"outgoing_response_queue{buId}")
  241. .WithAutoDelete())
  242. .OnDeclaredExchange(e => e
  243. .WithName($"outgoing_response_exchange{buId}")
  244. .WithAutoDelete()
  245. )
  246. )
  247. )).ConfigureAwait(false);
  248. logger.Info($" [x] Outgoing Response Received");
  249. }
  250. catch (Exception e)
  251. {
  252. logger.Error($" [x] Outgoing Exceptioned {e}");
  253. }
  254. }
  255. private async Task SetupLocalMqRpcServer()
  256. {
  257. try
  258. {
  259. await _client.RespondAsync<RequestRpc, ResponseRpc>(async request => await SendValuesThoughRpcAsync(request), ctx =>
  260. ctx.UseRespondConfiguration(cfg => cfg
  261. .Consume(c => c
  262. .WithRoutingKey($"{IncomingRpcKey}{buId}"))
  263. .FromDeclaredQueue(q => q
  264. .WithName($"incoming_queue{buId}")
  265. .WithAutoDelete())
  266. .OnDeclaredExchange(e => e
  267. .WithName($"{IncomingRpcExchange}{buId}")
  268. .WithAutoDelete()))).ConfigureAwait(false);
  269. }
  270. catch (Exception e)
  271. {
  272. logger.Error(e);
  273. }
  274. }
  275. private async Task<TypedAcknowlegement<ResponseRpc>> SendValuesThoughRpcAsync(RequestRpc request)
  276. {
  277. ResponseRpc response = null;
  278. try
  279. {
  280. string requestString = JsonConvert.SerializeObject(request);
  281. logger.Info($" [x] Incoming Request {requestString}");
  282. /*
  283. * AuthorizePump(int pumpId, double maxTrxAmount, double maxTrxVolume)
  284. */
  285. if (request.Method.ToLower() == "authorizepump"
  286. && request.Params != null && request.Params.Count == 3
  287. && request.Params.Any(p => p.Name == "PumpId") &&
  288. request.Params.Any(p => p.Name == "MaxTrxAmount") &&
  289. request.Params.Any(p => p.Name == "MaxTrxVolume"))
  290. {
  291. if (ManuallyAuthorizePump)
  292. {
  293. int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
  294. double maxTrxAmount =
  295. double.Parse(request.Params.First(p => p.Name == "MaxTrxAmount").Value);
  296. double maxTrxVolume =
  297. double.Parse(request.Params.First(p => p.Name == "MaxTrxVolume").Value);
  298. var result = await fdcServerHostApp.AuthorizePumpAsync(pumpId, maxTrxAmount, maxTrxVolume);
  299. if (result)
  300. response = new ResponseRpc()
  301. { Id = request.Id, Result = "true" };
  302. else
  303. response = new ResponseRpc()
  304. { Id = request.Id, Result = "false" };
  305. }
  306. else
  307. {
  308. response = new ResponseRpc()
  309. { Id = request.Id, Result = "false" };
  310. }
  311. }
  312. else if (request.Method.ToLower() == "getavailabletrxs"
  313. && request.Params != null && request.Params.Count == 1
  314. && request.Params.Any(p => p.Name == "PumpId"))
  315. {
  316. int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
  317. var result = await fdcServerHostApp.GetAvailableFuelSaleTrxsWithDetailsAsync(pumpId, -1, 30);
  318. var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
  319. if (pumpController != null && result != null && result.Any())
  320. {
  321. var fuelSaleTransactionResponse = new List<FuelSaleTransactionResponse>();
  322. foreach (var trx in result)
  323. {
  324. fuelSaleTransactionResponse.Add(FuelSaleTransactionResponse.CreateInstance(trx, pumpController));
  325. }
  326. response = new ResponseRpc()
  327. { Id = request.Id, Result = JsonConvert.SerializeObject(fuelSaleTransactionResponse) };
  328. }
  329. else
  330. {
  331. response = new ResponseRpc()
  332. { Id = request.Id, Result = "" };
  333. }
  334. }
  335. else if (request.Method.ToLower() == "cleartrx"
  336. && request.Params != null && request.Params.Count == 3
  337. && request.Params.Any(p => p.Name == "PumpId")
  338. && request.Params.Any(p => p.Name == "SequenceNo")
  339. && request.Params.Any(p => p.Name == "ReleaseToken"))
  340. {
  341. int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
  342. int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value);
  343. int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
  344. var result = await fdcServerHostApp.ClearFuelSaleTrxAndNotifyAllFdcClientsAsync(pumpId, sequenceNo.ToString(), releaseToken, "199");
  345. var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
  346. if (pumpController != null && result != null)
  347. response = new ResponseRpc()
  348. { Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) };
  349. else
  350. response = new ResponseRpc()
  351. { Id = request.Id, Result = "" };
  352. }
  353. else if (request.Method.ToLower() == "getnozzleproductconfig")
  354. {
  355. var result = fdcServerHostApp.GetNozzleExtraInfos();
  356. if (result != null)
  357. response = new ResponseRpc()
  358. { Id = request.Id, Result = JsonConvert.SerializeObject(result) };
  359. else
  360. response = new ResponseRpc()
  361. { Id = request.Id, Result = "" };
  362. }
  363. else if (request.Method.ToLower() == "locktrx"
  364. && request.Params != null && request.Params.Count == 3
  365. && request.Params.Any(p => p.Name == "PumpId")
  366. && request.Params.Any(p => p.Name == "SequenceNo")
  367. && request.Params.Any(p => p.Name == "ReleaseToken"))
  368. {
  369. int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
  370. int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value);
  371. int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
  372. var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
  373. var result = await fdcServerHostApp.LockFuelSaleTrxAndNotifyAllFdcClientsAsync(199, pumpId, sequenceNo, releaseToken);
  374. if (pumpController != null && result != null)
  375. response = new ResponseRpc()
  376. { Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) };
  377. else
  378. response = new ResponseRpc()
  379. { Id = request.Id, Result = "" };
  380. }
  381. else if (request.Method.ToLower() == "unlocktrx"
  382. && request.Params != null && request.Params.Count == 3
  383. && request.Params.Any(p => p.Name == "PumpId")
  384. && request.Params.Any(p => p.Name == "SequenceNo")
  385. && request.Params.Any(p => p.Name == "ReleaseToken"))
  386. {
  387. int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
  388. int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value);
  389. int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
  390. var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
  391. var result = await fdcServerHostApp.UnlockFuelSaleTrxAndNotifyAllFdcClientsAsync(199, pumpId, sequenceNo, releaseToken);
  392. if (pumpController != null && result != null)
  393. response = new ResponseRpc()
  394. { Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) };
  395. else
  396. response = new ResponseRpc()
  397. { Id = request.Id, Result = "" };
  398. }
  399. else if (request.Method.ToLower() == "getfueltrxs"
  400. && request.Params != null && request.Params.Count == 1
  401. && request.Params.Any(p => p.Name == "ReleaseToken"))
  402. {
  403. int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
  404. var parameters = new List<ApiDataParameter>();
  405. parameters.Add(new ApiDataParameter
  406. {
  407. Name = "ReleaseToken",
  408. Value = releaseToken.ToString()
  409. });
  410. var apiData = new ApiData()
  411. {
  412. Parameters = parameters
  413. };
  414. var fuelSales = await fdcServerHostApp.GetFuelSaleTrxDetailsAsync(apiData);
  415. if (fuelSales != null)
  416. response = new ResponseRpc()
  417. { Id = request.Id, Result = JsonConvert.SerializeObject(fuelSales) };
  418. else
  419. response = new ResponseRpc()
  420. { Id = request.Id, Result = "" };
  421. }
  422. else
  423. {
  424. response = new ResponseRpc();
  425. }
  426. logger.Info($" [x] Incoming Response {response.Result}");
  427. }
  428. catch (Exception ex)
  429. {
  430. logger.Info($" [x] Incoming Exceptioned {ex}");
  431. }
  432. if (response == null)
  433. {
  434. response = new ResponseRpc();
  435. }
  436. return Respond.Ack(response);
  437. }
  438. public Task<bool> Start()
  439. {
  440. this.isStopped = false;
  441. return Task.FromResult(true);
  442. }
  443. public Task<bool> Stop()
  444. {
  445. this.isStopped = true;
  446. return Task.FromResult(true);
  447. }
  448. private async Task NotifyFccStarted()
  449. {
  450. var notification = new NotificationRpc()
  451. {
  452. Method = "OnFccStarted",
  453. Params = new List<JsonRpcObjectMethodParameter>()
  454. {
  455. new JsonRpcObjectMethodParameter()
  456. {
  457. Name = "BuId", Value = this.buId
  458. },
  459. }
  460. };
  461. await SendToRemoteRpcServer(notification);
  462. }
  463. }
  464. }