using Edge.Core.Processor; using Edge.Core.IndustryStandardInterface.Pump; using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Linq; using Edge.Core.IndustryStandardInterface.ATG; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.DependencyInjection; using Edge.Core.Database; using AutoMapper; using Edge.Core.UniversalApi; using System.Text.Json; using Microsoft.EntityFrameworkCore; using System.Text.RegularExpressions; using Application.ATG_Classic_App.Model; using System.Diagnostics.CodeAnalysis; using System.Timers; using System.Threading; using Edge.Core.Processor.Dispatcher.Attributes; namespace Application.ATG_Classic_App { [UniversalApi(Name = "OnStateChange", EventDataType = typeof(AtgStateChangeEventArg), Description = "will fire on Atg state changed")] [UniversalApi(Name = "OnAlarm", EventDataType = typeof(AtgAlarmEventArg), Description = "will fire on Atg alarms detected")] [MetaPartsDescriptor( "lang-zh-cn:Pro-gauge 液位仪lang-en-us:Pro-gauge ATG", "lang-zh-cn:用于总控所有连入此系统的 Pro-gauge 探棒,并提供液位仪控制台功能lang-en-us:Used for overall control all Pro-gauge probes connected in the system, and providing functions of the ATG console", new[] { "lang-zh-cn:液位仪lang-en-us:ATG" })] public class App : IAppProcessor, IAutoTankGaugeController { private ILogger logger = NullLogger.Instance; private System.Timers.Timer polling_fast_TankReadingTimer; /// /// DO NOT lower down this value unless you understand the consequence. /// public int polling_fast_TankReadingTimer_Internval = 500; private int polling_fast_TankReadingTimer_Buffer_MaxLength_By_Second = 60 * 60 * 3; /// /// for track last Inventory saved to db time, and compare to current polling fast timer, if inventory sampling /// interval time reached, then start a new db saving. /// private DateTime lastInventoriesSavedIntoDatabaseTime = DateTime.MinValue; private IServiceProvider services; private IEnumerable probeHandlers; private IEnumerable richTankInfos = null; public string MetaConfigName { get; set; } public int DeviceId { get; } public IEnumerable Tanks => this.richTankInfos.Select(c => c.Tank); public SystemUnit SystemUnit { get; } public event EventHandler OnStateChange; public event EventHandler OnAlarm; #region UniversalApi - Config [UniversalApi(Description = "Get the TankOverallConfig, and all TankConfigs.")] public async Task>> GetConfigAsync() { var dbContext = this.DbContextFactory.CreateDbContext(); var tankOverallConfig = await dbContext.TankOverallConfigs .OrderByDescending(c => c.ModifiedTimeStamp) .ThenByDescending(c => c.CreatedTimeStamp).FirstOrDefaultAsync(); var haveToDoThisToMakeInMemoryDbUsedInUnitTestToWork = await dbContext.TankConfigs .Include(tc => tc.TankLimitConfig) .Include(tc => tc.ProbeConfig) .Include(tc => tc.ProductConfig) .Include(tc => tc.TankProfileDatas).ToListAsync(); var tankConfigs = haveToDoThisToMakeInMemoryDbUsedInUnitTestToWork.GroupBy(tc => tc.TankNumber) .Select(tcGp => tcGp.OrderByDescending(c => c.ModifiedTimeStamp) .ThenByDescending(c => c.CreatedTimeStamp).FirstOrDefault()); //var tankConfigs = await dbContext.TankConfigs // .Include(tc => tc.TankLimitConfig) // .Include(tc => tc.ProbeConfig) // .Include(tc => tc.ProductConfig) // .GroupBy(tc => tc.TankNumber) // .Select(tcGp => tcGp.OrderByDescending(c => c.ModifiedTimeStamp) // .ThenByDescending(c => c.CreatedTimeStamp).FirstOrDefault()).ToListAsync(); return new Tuple>(tankOverallConfig, tankConfigs); } [UniversalApi(Description = "Udpate or insert the config, the input and sample could be either of->
" + "TankOverallConfig: " + "{\"TcReference\":25,\"InventorySamplingInterval\":5000,\"DeliveryMode\":0,\"Id\":0},
" + "TankConfig: " + "{\"TankNumber\":1,\"Label\":\"I\u0027m Tank with Number 1\",\"Diameter\":1000,\"ThermalCoefficient\":0,\"DeliveryDelay\":0,\"TankProfileDatas\":null,\"ProductConfigId\":null,\"ProductConfig\":{\"ProductCode\":\"91\",\"ProductLabel\":\"91#\",\"Id\":0,\"CreatedTimeStamp\":\"0001-01-01T00:00:00\",\"ModifiedTimeStamp\":null},\"TankLimitConfigId\":null,\"TankLimitConfig\":{\"MaxVolume\":1100,\"FullVolume\":1200,\"HighProduct\":900,\"LowProduct\":100,\"HighWaterWarning\":10,\"HighWaterAlarm\":40,\"FuelTemperatureLowLimit\":1,\"FuelTemperatureHighLimit\":2,\"Id\":0,\"CreatedTimeStamp\":\"0001-01-01T00:00:00\",\"ModifiedTimeStamp\":null},\"ProbeConfigId\":null,\"ProbeConfig\":{\"DeviceId\":1,\"ProbeOffset\":0,\"WaterOffset\":0,\"Id\":0,\"CreatedTimeStamp\":\"0001-01-01T00:00:00\",\"ModifiedTimeStamp\":null},\"Id\":0,\"CreatedTimeStamp\":\"0001-01-01T00:00:00\",\"ModifiedTimeStamp\":null}" + ",
" + "TankLimitConfig: " + "{\"MaxVolume\":4400,\"FullVolume\":4800,\"HighProduct\":3600,\"LowProduct\":400,\"HighWaterWarning\":40,\"HighWaterAlarm\":160,\"FuelTemperatureLowLimit\":4,\"FuelTemperatureHighLimit\":8,\"Id\":0,\"CreatedTimeStamp\":\"0001-01-01T00:00:00\",\"ModifiedTimeStamp\":null}" + ",
" + "ProbeConfig: " + "{\"DeviceId\":4,\"ProbeOffset\":0,\"WaterOffset\":0,\"Id\":0,\"CreatedTimeStamp\":\"0001-01-01T00:00:00\",\"ModifiedTimeStamp\":null}" + ",
" + "ProductConfig: " + "{\"ProductCode\":\"94\",\"ProductLabel\":\"94#\",\"Id\":0,\"CreatedTimeStamp\":\"0001-01-01T00:00:00\",\"ModifiedTimeStamp\":null}")] public async Task UpsertConfigAsync(BaseConfig inputConfig) { /*update or insert a db row*/ if (inputConfig == null) throw new ArgumentException(nameof(inputConfig)); //BaseConfig inputConfig = null; var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true, }; //switch (input.Parameters.First().Name) //{ // case "TankOverallConfig": // { // inputConfig = JsonSerializer.Deserialize(input.Parameters.First().Value, options); // break; // } // case "TankConfig": // { // inputConfig = JsonSerializer.Deserialize(input.Parameters.First().Value, options); // break; // } // case "TankLimitConfig": // { // inputConfig = JsonSerializer.Deserialize(input.Parameters.First().Value, options); // break; // } // case "ProbeConfig": // { // inputConfig = JsonSerializer.Deserialize(input.Parameters.First().Value, options); // break; // } // case "ProductConfig": // { // inputConfig = JsonSerializer.Deserialize(input.Parameters.First().Value, options); // break; // } // default: // throw new InvalidOperationException("Unknown Config name: " + (input.Parameters.First().Name ?? "")); //} var dbContext = this.DbContextFactory.CreateDbContext();// new SqliteDbContext(); if (inputConfig.Id == 0) { /*create one in db*/ inputConfig.CreatedTimeStamp = DateTime.Now; inputConfig.ModifiedTimeStamp = null; dbContext.Add(inputConfig); var effectedRowCount = await dbContext.SaveChangesAsync(); } else { /*udpate one in db*/ inputConfig.ModifiedTimeStamp = DateTime.Now; dbContext.Entry(inputConfig).State = Microsoft.EntityFrameworkCore.EntityState.Modified; dbContext.Entry(inputConfig).Property(c => c.CreatedTimeStamp).IsModified = false; //dbContext.Entry(inputTankOverallConfig).Property(c => c.ModifiedTimeStamp).IsModified = true; await dbContext.SaveChangesAsync(); } var haveToDoThisToMakeInMemoryDbUsedInUnitTestToWork = await dbContext.TankConfigs .Include(tc => tc.TankLimitConfig) .Include(tc => tc.ProbeConfig) .Include(tc => tc.ProductConfig) .Include(tc => tc.TankProfileDatas).ToListAsync(); var tankConfigs = haveToDoThisToMakeInMemoryDbUsedInUnitTestToWork.GroupBy(tc => tc.TankNumber) .Select(tcGp => tcGp.OrderByDescending(c => c.ModifiedTimeStamp) .ThenByDescending(c => c.CreatedTimeStamp).FirstOrDefault()); var tankOverallConfig = await dbContext.Set()//.TankOverallConfigs .OrderByDescending(c => c.ModifiedTimeStamp) .ThenByDescending(c => c.CreatedTimeStamp).FirstOrDefaultAsync(); this.richTankInfos = this.TryReCreateRichTankInfos(tankConfigs, tankOverallConfig, this.probeHandlers); if (this.richTankInfos == null) { this.State = AtgState.Inoperative_MissingConfig; var onStateChangeEventArg = new AtgStateChangeEventArg(this.State, "Necessary tank configs were missed or invalid after the UpsertConfigAsync(...), fix them and wait for notify."); this.OnStateChange?.Invoke(this, onStateChangeEventArg); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, "OnStateChange", onStateChangeEventArg); return inputConfig; } else { this.State = AtgState.Idle; var onStateChangeEventArg = new AtgStateChangeEventArg(AtgState.TanksReloaded, "All tanks reloaded due to Config updated"); this.OnStateChange?.Invoke(this, onStateChangeEventArg); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, "OnStateChange", onStateChangeEventArg); return inputConfig; } } [UniversalApi(Description = "add a profile data for a tank.", InputParametersExampleJson = "[1,\"Height0:Volume0,Height1:Volume1,Height2:Volume2...\"]")] public async Task AddTankProfileDataAsync(byte targetTankNumber, string profileData) { /*Add new datas and delete the old datas*/ if (string.IsNullOrEmpty(profileData)) throw new ArgumentException("must provide profileData"); //dataStr format is: "Height0:Volume0,Height1:Volume1,Height2:Volume2..."} //do a format check if (!Regex.IsMatch(profileData, @"^(\d+(\.\d+)?:\s?\d+(\.\d+)?,\s?)+\d+(\.\d+)?:\d+(\.\d+)?$")) throw new ArgumentException("Data string content format is not correct"); string batchLabel = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); var dbContext = this.DbContextFactory.CreateDbContext(); //new SqliteDbContext(); var targetTankConfig = dbContext.Set()//.TankConfigs .First(tc => tc.TankNumber == targetTankNumber); var tankProfileDatas = profileData.Split(',').Select(part => new TankProfileData() { TankConfig = targetTankConfig, Height = double.Parse(part.Split(':')[0]), Volume = double.Parse(part.Split(':')[1]), BatchLabel = batchLabel, }); dbContext.Set()//.TankProfileDatas .AddRange(tankProfileDatas); var deleting = dbContext.Set()//.TankProfileDatas .Where(d => d.TankConfigId == targetTankConfig.Id && d.BatchLabel != batchLabel); dbContext.Set()//.TankProfileDatas .RemoveRange(deleting); await dbContext.SaveChangesAsync(); var haveToDoThisToMakeInMemoryDbUsedInUnitTestToWork = await dbContext.TankConfigs .Include(tc => tc.TankLimitConfig) .Include(tc => tc.ProbeConfig) .Include(tc => tc.ProductConfig) .Include(tc => tc.TankProfileDatas).ToListAsync(); var tankConfigs = haveToDoThisToMakeInMemoryDbUsedInUnitTestToWork.GroupBy(tc => tc.TankNumber) .Select(tcGp => tcGp.OrderByDescending(c => c.ModifiedTimeStamp) .ThenByDescending(c => c.CreatedTimeStamp).FirstOrDefault()); var tankOverallConfig = await dbContext.Set()//.TankOverallConfigs .OrderByDescending(c => c.ModifiedTimeStamp) .ThenByDescending(c => c.CreatedTimeStamp).FirstOrDefaultAsync(); this.richTankInfos = this.TryReCreateRichTankInfos(tankConfigs, tankOverallConfig, this.probeHandlers); if (this.richTankInfos == null) { this.State = AtgState.Inoperative_MissingConfig; var onStateChangeEventArg = new AtgStateChangeEventArg(this.State, "Necessary tank configs were missed or invalid after the AddTankProfileData(...), fix them and wait for notify."); this.OnStateChange?.Invoke(this, onStateChangeEventArg); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, "OnStateChange", onStateChangeEventArg); return false; } else { this.State = AtgState.Idle; var onStateChangeEventArg = new AtgStateChangeEventArg(AtgState.TanksReloaded, "All tanks reloaded due to TankProfileData updated"); this.OnStateChange?.Invoke(this, onStateChangeEventArg); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, "OnStateChange", onStateChangeEventArg); return true; } } [UniversalApi(Description = "start or stop a manual delivery", InputParametersExampleJson = "[1,\"start\"]")] public async Task StartOrStopManualDeliveryAsync(byte targetTankNumber, string operation) { if (operation.ToLower() != "start" && operation.ToLower() != "stop") throw new ArgumentException("must provide valid operation string 'start' or 'stop'"); Tuple mostRecentReading = null; if (this.tankReadings_fast_buffer.TryGetValue(targetTankNumber, out List> readings)) { mostRecentReading = readings.LastOrDefault(); if (mostRecentReading == null || // max tolerance is 4 cycles of fast polling missing. DateTime.Now.Subtract(mostRecentReading.Item1).TotalMilliseconds >= this.polling_fast_TankReadingTimer_Internval * 4) throw new InvalidOperationException("Could not get most recent tank reading for target tank, may retry later?"); if (mostRecentReading.Item2.Volume == null || mostRecentReading.Item2.WaterVolume == null) throw new InvalidOperationException("Most recent tank reading for target tank is invalid, may retry later?"); } var dbContext = this.DbContextFactory.CreateDbContext(); var lastDeliveryRecord = await dbContext.Deliveries.Where(d => d.TankNumber == targetTankNumber) .OrderByDescending(d => d.StartingDateTime).FirstOrDefaultAsync(); var targetTank = this.Tanks?.FirstOrDefault(t => t.TankNumber == targetTankNumber); if (targetTank == null) throw new InvalidOperationException("Could not find tank with tankNumber: " + targetTankNumber); if (operation.ToLower() == "start") { // last delivery still in open. if (lastDeliveryRecord != null && lastDeliveryRecord.EndingDateTime == null) throw new InvalidOperationException($"Previous Delivery(started " + $"from: {lastDeliveryRecord.StartingDateTime.ToString("yyyy-MM-dd HH:mm:ss")}) " + $"is still opening, close that and retry"); var openDelivery = new Model.Delivery() { TankNumber = targetTankNumber, StartingDateTime = DateTime.Now, StartingFuelHeight = mostRecentReading.Item2.Height ?? -1, StartingFuelVolume = mostRecentReading.Item2.Volume ?? -1, StartingFuelTCVolume = mostRecentReading.Item2.TcVolume ?? -1, StartingTemperture = mostRecentReading.Item2.Temperature ?? double.MinValue, StartingWaterHeight = mostRecentReading.Item2.Water ?? -1, StartingWaterVolume = mostRecentReading.Item2.WaterVolume ?? -1, }; dbContext.Deliveries.Add(openDelivery); await dbContext.SaveChangesAsync(); targetTank.State = TankState.Delivering; var onStateChangeEventArg = new AtgStateChangeEventArg( targetTank, this.State, $"Manual delivery is starting on Tank with TankNumber: {targetTankNumber} with " + $"StartingFuelVolume: {openDelivery.StartingFuelVolume}"); this.OnStateChange?.Invoke(this, onStateChangeEventArg); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, "OnStateChange", onStateChangeEventArg); } else if (operation.ToLower() == "stop") { // last open delivery does not exists. if (lastDeliveryRecord == null) throw new InvalidOperationException($"There's no start delivery record exists."); if (lastDeliveryRecord.EndingDateTime != null) throw new InvalidOperationException($"The last Delivery record is not opened."); lastDeliveryRecord.TankNumber = targetTankNumber; lastDeliveryRecord.EndingDateTime = DateTime.Now; lastDeliveryRecord.EndingFuelHeight = mostRecentReading.Item2.Height ?? -1; lastDeliveryRecord.EndingFuelVolume = mostRecentReading.Item2.Volume ?? -1; lastDeliveryRecord.EndingFuelTCVolume = mostRecentReading.Item2.TcVolume ?? -1; lastDeliveryRecord.EndingTemperture = mostRecentReading.Item2.Temperature ?? double.MinValue; lastDeliveryRecord.EndingWaterHeight = mostRecentReading.Item2.Water ?? -1; lastDeliveryRecord.EndingWaterVolume = mostRecentReading.Item2.WaterVolume ?? -1; dbContext.Deliveries.Update(lastDeliveryRecord); await dbContext.SaveChangesAsync(); targetTank.State = TankState.Idle; var onStateChangeEventArg = new AtgStateChangeEventArg( targetTank, this.State, $"Manual delivery has finished on Tank with TankNumber: {targetTankNumber} with " + $"EndingFuelVolume: {lastDeliveryRecord.EndingFuelVolume}"); this.OnStateChange?.Invoke(this, onStateChangeEventArg); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, "OnStateChange", onStateChangeEventArg); } else { throw new ArgumentException("unknown operationStr"); } return mostRecentReading.Item2; } #endregion #region UniversalApi - Service /// /// first parameter name must be 'tanknumber', and value is the tank number, like 1, 2, 3... /// /// /// [UniversalApi(Description = "", InputParametersExampleJson = "[1]")] public async Task GetTankReadingAsync(int tankNumber) { //return this.InternalGetTankReadingAsync(tankNumber); TankReading mostRecentCachedTankReading = null; if (this.tankReadings_fast_buffer.TryGetValue(tankNumber, out List> cachedReadings)) { var mostRecentReading = cachedReadings.LastOrDefault(); if (mostRecentReading == null || // max tolerance is 4 cycles of fast polling missing. DateTime.Now.Subtract(mostRecentReading.Item1).TotalMilliseconds >= this.polling_fast_TankReadingTimer_Internval * 2) throw new InvalidOperationException("Could not get most recent tank reading for target tank, may retry later?"); if (mostRecentReading.Item2.Volume == null || mostRecentReading.Item2.WaterVolume == null) throw new InvalidOperationException("Most recent tank reading for target tank is invalid, may retry later?"); mostRecentCachedTankReading = mostRecentReading.Item2; } var mapper = services.GetRequiredService(); return mapper.Map(mostRecentCachedTankReading); } [UniversalApi(Description = "", InputParametersExampleJson = "[1,8,0,\"2020-04-01T18:25:43.511Z\"]")] public async Task> GetTankDeliveryAsync(int tankNumber, int pageRowCount = 10, int pageIndex = 0, DateTime? filterTimestamp = null) { var dbContext = this.DbContextFactory.CreateDbContext(); var mostRecentDeliveries = await dbContext.Deliveries.Where(d => d.TankNumber == tankNumber) .Where(d => d.StartingDateTime >= (filterTimestamp ?? DateTime.MinValue)) .OrderByDescending(d => d.StartingDateTime).Skip(pageRowCount * pageIndex).Take(pageRowCount).ToListAsync(); var mapper = services.GetRequiredService(); return mapper.Map>(mostRecentDeliveries); } [UniversalApi(Description = "", InputParametersExampleJson = "[1,8,0,\"2020-04-01T18:25:43.511Z\"]")] public async Task> GetTankAlarmAsync(int tankNumber, int pageRowCount = 10, int pageIndex = 0, DateTime? filterTimestamp = null) { var mapper = services.GetRequiredService(); var dbContext = this.DbContextFactory.CreateDbContext(); var dbAlarms = await dbContext.Alarms.Where(al => al.TankNumber == tankNumber) .Where(d => d.CreatedTimeStamp >= (filterTimestamp ?? DateTime.MinValue)) .OrderByDescending(d => d.CreatedTimeStamp).Skip(pageRowCount * pageIndex).Take(pageRowCount).ToArrayAsync(); var alarms = mapper.Map>(dbAlarms); return alarms; } [UniversalApi(Description = "", InputParametersExampleJson = "[1,8,0,\"2020-04-01T18:25:43.511Z\"]")] public async Task> GetTankInventoryAsync(int tankNumber, int pageRowCount = 10, int pageIndex = 0, DateTime? filterTimestamp = null) { var mapper = services.GetRequiredService(); var dbContext = this.DbContextFactory.CreateDbContext(); var results = await dbContext.Inventories.Where(i => i.TankNumber == tankNumber) .Where(d => d.TimeStamp >= (filterTimestamp ?? DateTime.MinValue)) .OrderByDescending(d => d.TimeStamp).Skip(pageRowCount * pageIndex).Take(pageRowCount).ToArrayAsync(); return mapper.Map>(results); } [UniversalApi()] public Task> GetTanksAsync() { return Task.FromResult(this.richTankInfos?.Select(c => c.Tank)); } #endregion private SpinLock spinLock_GuardGetTankReading = new SpinLock(); private async Task InternalGetTankReadingAsync(int tankNumber) { bool lockTaken = false; try { this.spinLock_GuardGetTankReading.Enter(ref lockTaken); var targetRichTankInfo = this.richTankInfos.FirstOrDefault(ti => ti.Tank.TankNumber == tankNumber); var targetProbeHandler = this.probeHandlers.FirstOrDefault(ph => ph.Probe.DeviceId == targetRichTankInfo.TankConfig.ProbeConfig.DeviceId); if (targetProbeHandler == null) throw new ArgumentException("Could not find ProbeHandler which bound to Tank with tankNumber: " + tankNumber); var probeReading = await targetProbeHandler.GetProbeReadingAsync(); var volumeCaculator = new HeightToVolumeCaculator(targetRichTankInfo.TankConfig.TankProfileDatas); var tcCaculator = new TemperatureCompensationCaculator(targetRichTankInfo.TankConfig.ThermalCoefficient); var tankReading = new TankReading() { Density = probeReading.Density, Water = probeReading.Water, Height = probeReading.Height, Temperature = probeReading.Temperature.FirstOrDefault(), Volume = volumeCaculator.GetVolume(probeReading.Height ?? 0) - volumeCaculator.GetVolume(probeReading.Water ?? 0), WaterVolume = volumeCaculator.GetVolume(probeReading.Water ?? 0), TcVolume = tcCaculator.CaculateCompensatedVolume( targetRichTankInfo.TankOverallConfig.TcReference, volumeCaculator.GetVolume(probeReading.Height ?? 0) - volumeCaculator.GetVolume(probeReading.Water ?? 0), probeReading.Temperature.FirstOrDefault()), Ullage = volumeCaculator.GetVolume(targetRichTankInfo.TankConfig.TankProfileDatas.Max(p => p.Height)) - volumeCaculator.GetVolume(probeReading.Height ?? 0) }; var alarms = this.ExtractTankAlarms(tankReading, targetRichTankInfo.TankConfig.TankLimitConfig); if (alarms.Any()) { await this.UpsertAlarmsInDatabase(alarms); this.OnAlarm?.Invoke(this, new AtgAlarmEventArg(tankNumber, alarms)); } return tankReading; } finally { if (lockTaken) this.spinLock_GuardGetTankReading.Exit(false); } } private IEnumerable ExtractTankAlarms(TankReading tankReading, TankLimitConfig tankLimitConfig) { if (tankReading == null || tankLimitConfig == null) throw new ArgumentNullException(nameof(tankReading) + nameof(tankLimitConfig)); var alarms = new List(); if (tankLimitConfig.FuelTemperatureHighLimit <= tankReading.Temperature.Value) { alarms.Add(new Edge.Core.IndustryStandardInterface.ATG.Alarm(AlarmPriority.Alarm, AlarmType.TankHighTemperatureWarning)); } if (tankLimitConfig.FuelTemperatureLowLimit >= tankReading.Temperature.Value) { alarms.Add(new Edge.Core.IndustryStandardInterface.ATG.Alarm(AlarmPriority.Alarm, AlarmType.TankColdTemperatureWarning)); } if (tankLimitConfig.MaxVolume * tankLimitConfig.HighProduct <= tankReading.Volume.Value) { alarms.Add(new Edge.Core.IndustryStandardInterface.ATG.Alarm(AlarmPriority.Alarm, AlarmType.TankHighProductAlarm)); } if (tankLimitConfig.LowProduct >= tankReading.Volume) { alarms.Add(new Edge.Core.IndustryStandardInterface.ATG.Alarm(AlarmPriority.Alarm, AlarmType.TankLowProductAlarm)); } if (tankLimitConfig.HighWaterAlarm <= tankReading.Water) { alarms.Add(new Edge.Core.IndustryStandardInterface.ATG.Alarm(AlarmPriority.Alarm, AlarmType.TankHighWaterAlarm)); } if (tankLimitConfig.HighWaterWarning <= tankReading.Water) { alarms.Add(new Edge.Core.IndustryStandardInterface.ATG.Alarm(AlarmPriority.Warning, AlarmType.TankHighWaterWarning)); } return alarms; } /// /// tank number: /// private Dictionary>> tankReadings_fast_buffer = new Dictionary>>(); public IDbContextFactory DbContextFactory { get; set; } public AtgState State { get; private set; } private bool skipDatabaseMigration = false; public App(IServiceProvider services, int id, bool skipDatabaseMigration) { this.skipDatabaseMigration = skipDatabaseMigration; this.services = services; var loggerFactory = services.GetRequiredService(); this.logger = loggerFactory.CreateLogger("Application"); this.DbContextFactory = new DefaultDbContextFactory(); this.DbContextFactory.AddProvider(new ClassicAtgDbContextProvider()); //set a fast polling for better real time experience. this.polling_fast_TankReadingTimer = new System.Timers.Timer(this.polling_fast_TankReadingTimer_Internval); this.polling_fast_TankReadingTimer.Elapsed += async (_, __) => { if (this.richTankInfos == null || !this.richTankInfos.Any()) return; try { this.polling_fast_TankReadingTimer.Stop(); foreach (var richTankInfo in this.richTankInfos) { var tankReading = await this.InternalGetTankReadingAsync(richTankInfo.Tank.TankNumber); if (this.tankReadings_fast_buffer.TryGetValue(richTankInfo.Tank.TankNumber, out List> readings)) { var d = new Tuple(DateTime.Now, tankReading); readings.Add(d); readings.RemoveAll(i => DateTime.Now.Subtract(i.Item1).TotalSeconds > this.polling_fast_TankReadingTimer_Buffer_MaxLength_By_Second); } else { this.tankReadings_fast_buffer.Add(richTankInfo.Tank.TankNumber, new List>() { new Tuple(DateTime.Now,tankReading) }); } } // check if Inventory sampling interval cycle ellapsed, if yes, save one inventory into db. if (DateTime.Now.Subtract(this.lastInventoriesSavedIntoDatabaseTime).TotalMilliseconds >= (this.richTankInfos.FirstOrDefault()?.TankOverallConfig.InventorySamplingInterval ?? 60000)) { try { foreach (var data in this.tankReadings_fast_buffer) { var dbContext = this.DbContextFactory.CreateDbContext(); dbContext.Inventories.Add(Model.Inventory.From(data.Key, data.Value.Last().Item2, data.Value.Last().Item1)); await dbContext.SaveChangesAsync(); } } finally { this.lastInventoriesSavedIntoDatabaseTime = DateTime.Now; } } } finally { this.polling_fast_TankReadingTimer.Start(); } }; this.OnStateChange += (_, evtArg) => { if (evtArg.State == AtgState.TanksReloaded) { this.tankReadings_fast_buffer.Clear(); } }; } public App(IServiceProvider services, int id) : this(services, id, false) { } public void Init(IEnumerable processors) { var probeHandlers = processors.WithHandlerOrApp().SelectHandlerOrAppThenCast(); if (probeHandlers.GroupBy(p => p.Probe.DeviceId).Any(g => g.Count() > 1)) throw new ArgumentException("Duplicate Id in Probe handlers"); this.probeHandlers = probeHandlers; } public async Task Start() { if (!this.skipDatabaseMigration) { this.logger.LogInformation("Migrating database..."); var migrateDbContext = this.DbContextFactory.CreateDbContext(); try { migrateDbContext.Database.Migrate(); } catch (Exception exx) { string migrationsStr = ""; string pendingMigrationsStr = ""; string appliedMigrationsStr = ""; try { migrationsStr = migrateDbContext.Database?.GetMigrations()?.Aggregate((acc, n) => acc + ", " + n) ?? ""; pendingMigrationsStr = migrateDbContext.Database?.GetPendingMigrations()?.Aggregate((acc, n) => acc + ", " + n) ?? ""; appliedMigrationsStr = migrateDbContext.Database?.GetAppliedMigrations()?.Aggregate((acc, n) => acc + ", " + n) ?? ""; } catch { this.logger.LogError("ATG_Classic_App Exceptioned when Migrating the database, detail: " + exx + System.Environment.NewLine + "migrations are: " + migrationsStr + System.Environment.NewLine + ", pendingMigrations are: " + pendingMigrationsStr + System.Environment.NewLine + ", appliedMigrations are: " + appliedMigrationsStr); } throw new InvalidOperationException("failed for migrating the database"); } this.logger.LogInformation(" Migrate database finished."); } await this.PurgeDatabase(); var configs = await this.GetConfigAsync(); this.richTankInfos = this.TryReCreateRichTankInfos(configs.Item2, configs.Item1, this.probeHandlers); if (this.richTankInfos == null || !this.richTankInfos.Any()) { this.State = AtgState.Inoperative_MissingConfig; this.OnStateChange?.Invoke(this, new AtgStateChangeEventArg(this.State, "Necessary tank configs were missed, add configs and wait for notify")); } else { this.State = AtgState.Idle; this.OnStateChange?.Invoke(this, new AtgStateChangeEventArg(AtgState.TanksReloaded, "All tanks reloaded due to application started")); } this.polling_fast_TankReadingTimer.Start(); return true; } private IEnumerable TryReCreateRichTankInfos( IEnumerable tankConfigs, TankOverallConfig tankOverallConfig, IEnumerable probeHandlers) { if (tankConfigs == null || tankOverallConfig == null || probeHandlers == null || !tankConfigs.Any() || tankConfigs.Any(tc => tc.ProbeConfig == null || tc.ProductConfig == null || tc.TankLimitConfig == null || tc.TankProfileDatas == null)) return null; try { richTankInfos = tankConfigs.Select(tc => { var rti = new RichTankInfo( this.services, tc, tankOverallConfig, this.probeHandlers.FirstOrDefault(ph => ph.Probe.DeviceId == tc.ProbeConfig.DeviceId)); return rti; }); return richTankInfos; } catch { return null; } } public async Task Stop() { this.tankReadings_fast_buffer.Clear(); this.polling_fast_TankReadingTimer.Stop(); return true; } private async Task PurgeDatabase() { #region let's do some purge work here... only keep 2 years data, no special reason. int maxKeepDays = 365 * 2; DateTime keepHistoryFrom = DateTime.Now.Subtract(new TimeSpan(maxKeepDays, 0, 0, 0)); var dbContext = this.DbContextFactory.CreateDbContext(); var deletingInventories = dbContext.Inventories.Where(i => i.TimeStamp < keepHistoryFrom); dbContext.RemoveRange(deletingInventories); var deletingAlarms = dbContext.Alarms.Where(a => a.CreatedTimeStamp < keepHistoryFrom); dbContext.RemoveRange(deletingAlarms); var deletingDelieveries = dbContext.Deliveries.Where(d => d.StartingDateTime < keepHistoryFrom); dbContext.RemoveRange(deletingDelieveries); await dbContext.SaveChangesAsync(); #endregion } private class AlarmEqualityComparer : IEqualityComparer { public bool Equals([AllowNull] Edge.Core.IndustryStandardInterface.ATG.Alarm x, [AllowNull] Edge.Core.IndustryStandardInterface.ATG.Alarm y) { if (x == null && y == null) return false; if ((x == null && y != null) || (x != null && y == null)) return false; if (x.TankNumber == y.TankNumber && x.Type == y.Type && x.Priority == y.Priority && x.Description == y.Description) return true; return false; } public int GetHashCode([DisallowNull] Edge.Core.IndustryStandardInterface.ATG.Alarm obj) { return (int)obj.TankNumber ^ (int)obj.Type ^ (int)obj.Priority; } } /// /// Update or insert alarms into database. /// Searching all active alarms in database: /// for each active alarms: /// if same type real time alarm disappeared-> mark alarm as cleared. /// if same type real time alarm existed as well-> did nothing. /// if same type real time alarm does not exists-> /// Add realTime alarm into database. /// /// /// protected virtual async Task UpsertAlarmsInDatabase(IEnumerable realTimeAlarms) { var mapper = services.GetRequiredService(); foreach (var g in realTimeAlarms.GroupBy(a => a.TankNumber)) { var dbContext = this.DbContextFactory.CreateDbContext(); var dbActiveAlarms = mapper.Map>( dbContext.Set()//.Alarms .Where(al => al.ClearedTimeStamp == null && al.TankNumber == g.Key // only searching latest 30 days alarms, for performance wise? && DateTime.Now.Subtract(al.CreatedTimeStamp).TotalDays <= 30) ); var clearedAlarms = mapper.Map>( dbActiveAlarms.Except(g, new AlarmEqualityComparer())); clearedAlarms.ToList().ForEach(c => c.ClearedTimeStamp = DateTime.Now); var addedAlarms = mapper.Map>( realTimeAlarms.Except(dbActiveAlarms, new AlarmEqualityComparer())); if (clearedAlarms != null && clearedAlarms.Any()) dbContext.UpdateRange(clearedAlarms); if (addedAlarms != null && addedAlarms.Any()) await dbContext.AddRangeAsync(addedAlarms); await dbContext.SaveChangesAsync(); } } public Task> GetTanksReadingAsync() { throw new NotImplementedException(); } } }