using Applications.FDC; using Edge.Core.Processor;using Edge.Core.IndustryStandardInterface.Pump; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net.Mail; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading.Tasks; using System.Timers; using Edge.Core.Database.Models; using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using RabbitMQ.Client.Content; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.MessagePatterns; using RawRabbit; using RawRabbit.Configuration; using RawRabbit.Enrichers.GlobalExecutionId; using RawRabbit.Enrichers.MessageContext; using RawRabbit.Enrichers.MessageContext.Context; using RawRabbit.Instantiation; using Edge.Core.UniversalApi; using RawRabbit.Operations.Respond.Acknowledgement; namespace Applications.PumpInfoToRemote { public class App : IAppProcessor { private static IBusClient _client; internal static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Application"); private static FdcServerHostApp fdcServerHostApp = null; public string RabbitMsgQueueConfigName { get; set; } public string IncomingRpcKey { get; set; } public string IncomingRpcExchange { get; set; } public string OutgoingRpcKey { get; set; } public string OutgoingRpcExchange { get; set; } public string sn { get; set; } public string buId { get; set; } public bool ManuallyAuthorizePump { get; set; } private bool isStopped = false; public string MetaConfigName { get; set; } private bool startedFlag = false; //private RpcClient rpcClient; /// /// /// /// this business unit that the fusion belongs to public App(string rabbitMsgQueueConfigName, string outgoingRpcKey, string outgoingRpcExchange, string incomingRpcKey, string incomingRpcExchange, string buId, int manuallyAuthorizePump) { RabbitMsgQueueConfigName = rabbitMsgQueueConfigName; OutgoingRpcKey = outgoingRpcKey; OutgoingRpcExchange = outgoingRpcExchange; IncomingRpcKey = incomingRpcKey; IncomingRpcExchange = incomingRpcExchange; this.buId = buId; ManuallyAuthorizePump = manuallyAuthorizePump == 1 ? true : false; } public void Init(IEnumerable processors) { fdcServerHostApp = processors.OfType().FirstOrDefault(); if (fdcServerHostApp == null) throw new ArgumentNullException("Can't find the FdcServerHostApp from processors"); _client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions { ClientConfiguration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile(RabbitMsgQueueConfigName) .Build() .Get(), Plugins = p => p .UseGlobalExecutionId() .UseMessageContext() }); SetupLocalMqRpcServer().GetAwaiter().GetResult(); //SetupLocalMqPublisher().GetAwaiter().GetResult(); } private async Task SetupLocalMqPublisher() { fdcServerHostApp.OnStateChange += async (s, a) => { if (this.isStopped) return; var pump = s as IFdcPumpController; //var notification = new NotificationRpc() //{ // Method = "OnStateChange", // Params = new List() // { // new JsonRpcObjectMethodParameter() // { // Name = "PumpId", Value = pump.PumpId.ToString() // }, // new JsonRpcObjectMethodParameter() // { // Name = "NozzleId", // Value = a.StateChangedNozzles == null // ? "" // : a.StateChangedNozzles.FirstOrDefault()?.LogicalId.ToString() ?? "" // }, // new JsonRpcObjectMethodParameter() // { // Name = "State", Value = a.NewPumpState.ToString() // }, // new JsonRpcObjectMethodParameter() // { // Name = "BuId", Value = this.buId // }, // } //}; //SendToRemoteRpcServer(notification); if (!startedFlag) { startedFlag = true; await NotifyFccStarted(); } }; fdcServerHostApp.OnCurrentFuellingStatusChange += async (s, a) => { if (this.isStopped) return; var pump = s as IFdcPumpController; if (a.Transaction.Finished == true) { var notification = new NotificationRpc() { Method = "OnCurrentFuellingStatusChange", Params = new List() { new JsonRpcObjectMethodParameter() { Name = "PumpId", Value = pump.PumpId.ToString() }, new JsonRpcObjectMethodParameter() { Name = "NozzleId", Value = a.Transaction.Nozzle.LogicalId.ToString() }, new JsonRpcObjectMethodParameter() { Name = "State", Value = "Finished" }, new JsonRpcObjectMethodParameter() { Name = "SequenceNumberGeneratedOnPhysicalPump", Value = a.Transaction.SequenceNumberGeneratedOnPhysicalPump.ToString() }, new JsonRpcObjectMethodParameter() { Name = "Amount", Value = (a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)).ToString() }, new JsonRpcObjectMethodParameter() { Name = "Volume", Value = (a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)).ToString() }, new JsonRpcObjectMethodParameter() { Name = "Price", Value = (a.Transaction.Price / Math.Pow(10, pump.PriceDecimalDigits)).ToString() }, new JsonRpcObjectMethodParameter() { Name = "VolumeTotalizer", Value = a.Transaction.VolumeTotalizer?.ToString()??"-1" }, new JsonRpcObjectMethodParameter() { Name = "ReleaseToken", Value = a.ReleaseToken.ToString() }, new JsonRpcObjectMethodParameter() { Name = "BuId", Value = this.buId } } }; await SendToRemoteRpcServer(notification); } else if (ManuallyAuthorizePump) { var notification = new NotificationRpc() { Method = "OnCurrentFuellingStatusChange", Params = new List() { new JsonRpcObjectMethodParameter() { Name = "PumpId", Value = pump.PumpId.ToString() }, new JsonRpcObjectMethodParameter() { Name = "NozzleId", Value = a.Transaction.Nozzle.LogicalId.ToString() }, new JsonRpcObjectMethodParameter() { Name = "State", Value = "Fueling" }, new JsonRpcObjectMethodParameter() { Name = "Amount", Value = (a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)).ToString() }, new JsonRpcObjectMethodParameter() { Name = "Volume", Value = (a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)).ToString() }, new JsonRpcObjectMethodParameter() { Name = "Price", Value = (a.Transaction.Price / Math.Pow(10, pump.PriceDecimalDigits)).ToString() }, new JsonRpcObjectMethodParameter() { Name = "BuId", Value = this.buId } } }; await SendToRemoteRpcServer(notification); } }; } private async Task SendToRemoteRpcServer(NotificationRpc notification) { string output = JsonConvert.SerializeObject(notification); logger.Info($" [x] Outgoing Request {output}"); try { await _client.RequestAsync(notification, ctx => ctx .UseRequestConfiguration(cfg => cfg .PublishRequest(p => p .OnDeclaredExchange(e => e .WithName(OutgoingRpcExchange) .WithAutoDelete()) .WithRoutingKey(OutgoingRpcKey) .WithProperties(prop => prop.DeliveryMode = 1)) .ConsumeResponse(r => r .Consume(c => c .WithRoutingKey($"outgoing_response_key{buId}")) .FromDeclaredQueue(q => q .WithName($"outgoing_response_queue{buId}") .WithAutoDelete()) .OnDeclaredExchange(e => e .WithName($"outgoing_response_exchange{buId}") .WithAutoDelete() ) ) )).ConfigureAwait(false); logger.Info($" [x] Outgoing Response Received"); } catch (Exception e) { logger.Error($" [x] Outgoing Exceptioned {e}"); } } private async Task SetupLocalMqRpcServer() { try { await _client.RespondAsync(async request => await SendValuesThoughRpcAsync(request), ctx => ctx.UseRespondConfiguration(cfg => cfg .Consume(c => c .WithRoutingKey($"{IncomingRpcKey}{buId}")) .FromDeclaredQueue(q => q .WithName($"incoming_queue{buId}") .WithAutoDelete()) .OnDeclaredExchange(e => e .WithName($"{IncomingRpcExchange}{buId}") .WithAutoDelete()))).ConfigureAwait(false); } catch (Exception e) { logger.Error(e); } } private async Task> SendValuesThoughRpcAsync(RequestRpc request) { ResponseRpc response = null; try { string requestString = JsonConvert.SerializeObject(request); logger.Info($" [x] Incoming Request {requestString}"); /* * AuthorizePump(int pumpId, double maxTrxAmount, double maxTrxVolume) */ if (request.Method.ToLower() == "authorizepump" && request.Params != null && request.Params.Count == 3 && request.Params.Any(p => p.Name == "PumpId") && request.Params.Any(p => p.Name == "MaxTrxAmount") && request.Params.Any(p => p.Name == "MaxTrxVolume")) { if (ManuallyAuthorizePump) { int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value); double maxTrxAmount = double.Parse(request.Params.First(p => p.Name == "MaxTrxAmount").Value); double maxTrxVolume = double.Parse(request.Params.First(p => p.Name == "MaxTrxVolume").Value); var result = await fdcServerHostApp.AuthorizePumpAsync(pumpId, maxTrxAmount, maxTrxVolume); if (result) response = new ResponseRpc() { Id = request.Id, Result = "true" }; else response = new ResponseRpc() { Id = request.Id, Result = "false" }; } else { response = new ResponseRpc() { Id = request.Id, Result = "false" }; } } else if (request.Method.ToLower() == "getavailabletrxs" && request.Params != null && request.Params.Count == 1 && request.Params.Any(p => p.Name == "PumpId")) { int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value); var result = await fdcServerHostApp.GetAvailableFuelSaleTrxsWithDetailsAsync(pumpId, -1, 30); var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId); if (pumpController != null && result != null && result.Any()) { var fuelSaleTransactionResponse = new List(); foreach (var trx in result) { fuelSaleTransactionResponse.Add(FuelSaleTransactionResponse.CreateInstance(trx, pumpController)); } response = new ResponseRpc() { Id = request.Id, Result = JsonConvert.SerializeObject(fuelSaleTransactionResponse) }; } else { response = new ResponseRpc() { Id = request.Id, Result = "" }; } } else if (request.Method.ToLower() == "cleartrx" && request.Params != null && request.Params.Count == 3 && request.Params.Any(p => p.Name == "PumpId") && request.Params.Any(p => p.Name == "SequenceNo") && request.Params.Any(p => p.Name == "ReleaseToken")) { int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value); int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value); int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value); var result = await fdcServerHostApp.ClearFuelSaleTrxAndNotifyAllFdcClientsAsync(pumpId, sequenceNo.ToString(), releaseToken, "199"); var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId); if (pumpController != null && result != null) response = new ResponseRpc() { Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) }; else response = new ResponseRpc() { Id = request.Id, Result = "" }; } else if (request.Method.ToLower() == "getnozzleproductconfig") { var result = fdcServerHostApp.GetNozzleExtraInfos(); if (result != null) response = new ResponseRpc() { Id = request.Id, Result = JsonConvert.SerializeObject(result) }; else response = new ResponseRpc() { Id = request.Id, Result = "" }; } else if (request.Method.ToLower() == "locktrx" && request.Params != null && request.Params.Count == 3 && request.Params.Any(p => p.Name == "PumpId") && request.Params.Any(p => p.Name == "SequenceNo") && request.Params.Any(p => p.Name == "ReleaseToken")) { int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value); int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value); int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value); var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId); var result = await fdcServerHostApp.LockFuelSaleTrxAndNotifyAllFdcClientsAsync(199, pumpId, sequenceNo, releaseToken); if (pumpController != null && result != null) response = new ResponseRpc() { Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) }; else response = new ResponseRpc() { Id = request.Id, Result = "" }; } else if (request.Method.ToLower() == "unlocktrx" && request.Params != null && request.Params.Count == 3 && request.Params.Any(p => p.Name == "PumpId") && request.Params.Any(p => p.Name == "SequenceNo") && request.Params.Any(p => p.Name == "ReleaseToken")) { int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value); int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value); int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value); var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId); var result = await fdcServerHostApp.UnlockFuelSaleTrxAndNotifyAllFdcClientsAsync(199, pumpId, sequenceNo, releaseToken); if (pumpController != null && result != null) response = new ResponseRpc() { Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) }; else response = new ResponseRpc() { Id = request.Id, Result = "" }; } else if (request.Method.ToLower() == "getfueltrxs" && request.Params != null && request.Params.Count == 1 && request.Params.Any(p => p.Name == "ReleaseToken")) { int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value); var parameters = new List(); parameters.Add(new ApiDataParameter { Name = "ReleaseToken", Value = releaseToken.ToString() }); var apiData = new ApiData() { Parameters = parameters }; var fuelSales = await fdcServerHostApp.GetFuelSaleTrxDetailsAsync(apiData); if (fuelSales != null) response = new ResponseRpc() { Id = request.Id, Result = JsonConvert.SerializeObject(fuelSales) }; else response = new ResponseRpc() { Id = request.Id, Result = "" }; } else { response = new ResponseRpc(); } logger.Info($" [x] Incoming Response {response.Result}"); } catch (Exception ex) { logger.Info($" [x] Incoming Exceptioned {ex}"); } if (response == null) { response = new ResponseRpc(); } return Respond.Ack(response); } public Task Start() { this.isStopped = false; return Task.FromResult(true); } public Task Stop() { this.isStopped = true; return Task.FromResult(true); } private async Task NotifyFccStarted() { var notification = new NotificationRpc() { Method = "OnFccStarted", Params = new List() { new JsonRpcObjectMethodParameter() { Name = "BuId", Value = this.buId }, } }; await SendToRemoteRpcServer(notification); } } }