using Applications.FDC;
using Edge.Core.Processor;using Edge.Core.IndustryStandardInterface.Pump;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Mail;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
using Edge.Core.Database.Models;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using RabbitMQ.Client.Content;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.MessagePatterns;
using RawRabbit;
using RawRabbit.Configuration;
using RawRabbit.Enrichers.GlobalExecutionId;
using RawRabbit.Enrichers.MessageContext;
using RawRabbit.Enrichers.MessageContext.Context;
using RawRabbit.Instantiation;
using Edge.Core.UniversalApi;
using RawRabbit.Operations.Respond.Acknowledgement;
namespace Applications.PumpInfoToRemote
{
public class App : IAppProcessor
{
private static IBusClient _client;
internal static NLog.Logger logger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Application");
private static FdcServerHostApp fdcServerHostApp = null;
public string RabbitMsgQueueConfigName { get; set; }
public string IncomingRpcKey { get; set; }
public string IncomingRpcExchange { get; set; }
public string OutgoingRpcKey { get; set; }
public string OutgoingRpcExchange { get; set; }
public string sn { get; set; }
public string buId { get; set; }
public bool ManuallyAuthorizePump { get; set; }
private bool isStopped = false;
public string MetaConfigName { get; set; }
private bool startedFlag = false;
//private RpcClient rpcClient;
///
///
///
/// this business unit that the fusion belongs to
public App(string rabbitMsgQueueConfigName,
string outgoingRpcKey,
string outgoingRpcExchange,
string incomingRpcKey,
string incomingRpcExchange,
string buId,
int manuallyAuthorizePump)
{
RabbitMsgQueueConfigName = rabbitMsgQueueConfigName;
OutgoingRpcKey = outgoingRpcKey;
OutgoingRpcExchange = outgoingRpcExchange;
IncomingRpcKey = incomingRpcKey;
IncomingRpcExchange = incomingRpcExchange;
this.buId = buId;
ManuallyAuthorizePump = manuallyAuthorizePump == 1 ? true : false;
}
public void Init(IEnumerable processors)
{
fdcServerHostApp = processors.OfType().FirstOrDefault();
if (fdcServerHostApp == null)
throw new ArgumentNullException("Can't find the FdcServerHostApp from processors");
_client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions
{
ClientConfiguration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile(RabbitMsgQueueConfigName)
.Build()
.Get(),
Plugins = p => p
.UseGlobalExecutionId()
.UseMessageContext()
});
SetupLocalMqRpcServer().GetAwaiter().GetResult();
//SetupLocalMqPublisher().GetAwaiter().GetResult();
}
private async Task SetupLocalMqPublisher()
{
fdcServerHostApp.OnStateChange += async (s, a) =>
{
if (this.isStopped) return;
var pump = s as IFdcPumpController;
//var notification = new NotificationRpc()
//{
// Method = "OnStateChange",
// Params = new List()
// {
// new JsonRpcObjectMethodParameter()
// {
// Name = "PumpId", Value = pump.PumpId.ToString()
// },
// new JsonRpcObjectMethodParameter()
// {
// Name = "NozzleId",
// Value = a.StateChangedNozzles == null
// ? ""
// : a.StateChangedNozzles.FirstOrDefault()?.LogicalId.ToString() ?? ""
// },
// new JsonRpcObjectMethodParameter()
// {
// Name = "State", Value = a.NewPumpState.ToString()
// },
// new JsonRpcObjectMethodParameter()
// {
// Name = "BuId", Value = this.buId
// },
// }
//};
//SendToRemoteRpcServer(notification);
if (!startedFlag)
{
startedFlag = true;
await NotifyFccStarted();
}
};
fdcServerHostApp.OnCurrentFuellingStatusChange += async (s, a) =>
{
if (this.isStopped) return;
var pump = s as IFdcPumpController;
if (a.Transaction.Finished == true)
{
var notification = new NotificationRpc()
{
Method = "OnCurrentFuellingStatusChange",
Params = new List()
{
new JsonRpcObjectMethodParameter()
{
Name = "PumpId", Value = pump.PumpId.ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "NozzleId", Value = a.Transaction.Nozzle.LogicalId.ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "State", Value = "Finished"
},
new JsonRpcObjectMethodParameter()
{
Name = "SequenceNumberGeneratedOnPhysicalPump",
Value = a.Transaction.SequenceNumberGeneratedOnPhysicalPump.ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "Amount", Value = (a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)).ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "Volume", Value = (a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)).ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "Price", Value = (a.Transaction.Price / Math.Pow(10, pump.PriceDecimalDigits)).ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "VolumeTotalizer", Value = a.Transaction.VolumeTotalizer?.ToString()??"-1"
},
new JsonRpcObjectMethodParameter()
{
Name = "ReleaseToken", Value = a.ReleaseToken.ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "BuId", Value = this.buId
}
}
};
await SendToRemoteRpcServer(notification);
}
else if (ManuallyAuthorizePump)
{
var notification = new NotificationRpc()
{
Method = "OnCurrentFuellingStatusChange",
Params = new List()
{
new JsonRpcObjectMethodParameter()
{
Name = "PumpId", Value = pump.PumpId.ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "NozzleId", Value = a.Transaction.Nozzle.LogicalId.ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "State", Value = "Fueling"
},
new JsonRpcObjectMethodParameter()
{
Name = "Amount", Value = (a.Transaction.Amount / Math.Pow(10, pump.AmountDecimalDigits)).ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "Volume", Value = (a.Transaction.Volumn / Math.Pow(10, pump.VolumeDecimalDigits)).ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "Price", Value = (a.Transaction.Price / Math.Pow(10, pump.PriceDecimalDigits)).ToString()
},
new JsonRpcObjectMethodParameter()
{
Name = "BuId", Value = this.buId
}
}
};
await SendToRemoteRpcServer(notification);
}
};
}
private async Task SendToRemoteRpcServer(NotificationRpc notification)
{
string output = JsonConvert.SerializeObject(notification);
logger.Info($" [x] Outgoing Request {output}");
try
{
await _client.RequestAsync(notification, ctx => ctx
.UseRequestConfiguration(cfg => cfg
.PublishRequest(p => p
.OnDeclaredExchange(e => e
.WithName(OutgoingRpcExchange)
.WithAutoDelete())
.WithRoutingKey(OutgoingRpcKey)
.WithProperties(prop => prop.DeliveryMode = 1))
.ConsumeResponse(r => r
.Consume(c => c
.WithRoutingKey($"outgoing_response_key{buId}"))
.FromDeclaredQueue(q => q
.WithName($"outgoing_response_queue{buId}")
.WithAutoDelete())
.OnDeclaredExchange(e => e
.WithName($"outgoing_response_exchange{buId}")
.WithAutoDelete()
)
)
)).ConfigureAwait(false);
logger.Info($" [x] Outgoing Response Received");
}
catch (Exception e)
{
logger.Error($" [x] Outgoing Exceptioned {e}");
}
}
private async Task SetupLocalMqRpcServer()
{
try
{
await _client.RespondAsync(async request => await SendValuesThoughRpcAsync(request), ctx =>
ctx.UseRespondConfiguration(cfg => cfg
.Consume(c => c
.WithRoutingKey($"{IncomingRpcKey}{buId}"))
.FromDeclaredQueue(q => q
.WithName($"incoming_queue{buId}")
.WithAutoDelete())
.OnDeclaredExchange(e => e
.WithName($"{IncomingRpcExchange}{buId}")
.WithAutoDelete()))).ConfigureAwait(false);
}
catch (Exception e)
{
logger.Error(e);
}
}
private async Task> SendValuesThoughRpcAsync(RequestRpc request)
{
ResponseRpc response = null;
try
{
string requestString = JsonConvert.SerializeObject(request);
logger.Info($" [x] Incoming Request {requestString}");
/*
* AuthorizePump(int pumpId, double maxTrxAmount, double maxTrxVolume)
*/
if (request.Method.ToLower() == "authorizepump"
&& request.Params != null && request.Params.Count == 3
&& request.Params.Any(p => p.Name == "PumpId") &&
request.Params.Any(p => p.Name == "MaxTrxAmount") &&
request.Params.Any(p => p.Name == "MaxTrxVolume"))
{
if (ManuallyAuthorizePump)
{
int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
double maxTrxAmount =
double.Parse(request.Params.First(p => p.Name == "MaxTrxAmount").Value);
double maxTrxVolume =
double.Parse(request.Params.First(p => p.Name == "MaxTrxVolume").Value);
var result = await fdcServerHostApp.AuthorizePumpAsync(pumpId, maxTrxAmount, maxTrxVolume);
if (result)
response = new ResponseRpc()
{ Id = request.Id, Result = "true" };
else
response = new ResponseRpc()
{ Id = request.Id, Result = "false" };
}
else
{
response = new ResponseRpc()
{ Id = request.Id, Result = "false" };
}
}
else if (request.Method.ToLower() == "getavailabletrxs"
&& request.Params != null && request.Params.Count == 1
&& request.Params.Any(p => p.Name == "PumpId"))
{
int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
var result = await fdcServerHostApp.GetAvailableFuelSaleTrxsWithDetailsAsync(pumpId, -1, 30);
var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
if (pumpController != null && result != null && result.Any())
{
var fuelSaleTransactionResponse = new List();
foreach (var trx in result)
{
fuelSaleTransactionResponse.Add(FuelSaleTransactionResponse.CreateInstance(trx, pumpController));
}
response = new ResponseRpc()
{ Id = request.Id, Result = JsonConvert.SerializeObject(fuelSaleTransactionResponse) };
}
else
{
response = new ResponseRpc()
{ Id = request.Id, Result = "" };
}
}
else if (request.Method.ToLower() == "cleartrx"
&& request.Params != null && request.Params.Count == 3
&& request.Params.Any(p => p.Name == "PumpId")
&& request.Params.Any(p => p.Name == "SequenceNo")
&& request.Params.Any(p => p.Name == "ReleaseToken"))
{
int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value);
int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
var result = await fdcServerHostApp.ClearFuelSaleTrxAndNotifyAllFdcClientsAsync(pumpId, sequenceNo.ToString(), releaseToken, "199");
var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
if (pumpController != null && result != null)
response = new ResponseRpc()
{ Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) };
else
response = new ResponseRpc()
{ Id = request.Id, Result = "" };
}
else if (request.Method.ToLower() == "getnozzleproductconfig")
{
var result = fdcServerHostApp.GetNozzleExtraInfos();
if (result != null)
response = new ResponseRpc()
{ Id = request.Id, Result = JsonConvert.SerializeObject(result) };
else
response = new ResponseRpc()
{ Id = request.Id, Result = "" };
}
else if (request.Method.ToLower() == "locktrx"
&& request.Params != null && request.Params.Count == 3
&& request.Params.Any(p => p.Name == "PumpId")
&& request.Params.Any(p => p.Name == "SequenceNo")
&& request.Params.Any(p => p.Name == "ReleaseToken"))
{
int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value);
int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
var result = await fdcServerHostApp.LockFuelSaleTrxAndNotifyAllFdcClientsAsync(199, pumpId, sequenceNo, releaseToken);
if (pumpController != null && result != null)
response = new ResponseRpc()
{ Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) };
else
response = new ResponseRpc()
{ Id = request.Id, Result = "" };
}
else if (request.Method.ToLower() == "unlocktrx"
&& request.Params != null && request.Params.Count == 3
&& request.Params.Any(p => p.Name == "PumpId")
&& request.Params.Any(p => p.Name == "SequenceNo")
&& request.Params.Any(p => p.Name == "ReleaseToken"))
{
int pumpId = int.Parse(request.Params.First(p => p.Name == "PumpId").Value);
int sequenceNo = int.Parse(request.Params.First(p => p.Name == "SequenceNo").Value);
int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
var pumpController = fdcServerHostApp.GetFdcPumpController(pumpId);
var result = await fdcServerHostApp.UnlockFuelSaleTrxAndNotifyAllFdcClientsAsync(199, pumpId, sequenceNo, releaseToken);
if (pumpController != null && result != null)
response = new ResponseRpc()
{ Id = request.Id, Result = JsonConvert.SerializeObject(FuelSaleTransactionResponse.CreateInstance(result, pumpController)) };
else
response = new ResponseRpc()
{ Id = request.Id, Result = "" };
}
else if (request.Method.ToLower() == "getfueltrxs"
&& request.Params != null && request.Params.Count == 1
&& request.Params.Any(p => p.Name == "ReleaseToken"))
{
int releaseToken = int.Parse(request.Params.First(p => p.Name == "ReleaseToken").Value);
var parameters = new List();
parameters.Add(new ApiDataParameter
{
Name = "ReleaseToken",
Value = releaseToken.ToString()
});
var apiData = new ApiData()
{
Parameters = parameters
};
var fuelSales = await fdcServerHostApp.GetFuelSaleTrxDetailsAsync(apiData);
if (fuelSales != null)
response = new ResponseRpc()
{ Id = request.Id, Result = JsonConvert.SerializeObject(fuelSales) };
else
response = new ResponseRpc()
{ Id = request.Id, Result = "" };
}
else
{
response = new ResponseRpc();
}
logger.Info($" [x] Incoming Response {response.Result}");
}
catch (Exception ex)
{
logger.Info($" [x] Incoming Exceptioned {ex}");
}
if (response == null)
{
response = new ResponseRpc();
}
return Respond.Ack(response);
}
public Task Start()
{
this.isStopped = false;
return Task.FromResult(true);
}
public Task Stop()
{
this.isStopped = true;
return Task.FromResult(true);
}
private async Task NotifyFccStarted()
{
var notification = new NotificationRpc()
{
Method = "OnFccStarted",
Params = new List()
{
new JsonRpcObjectMethodParameter()
{
Name = "BuId", Value = this.buId
},
}
};
await SendToRemoteRpcServer(notification);
}
}
}