using Dfs.WayneChina.SpsDataCourier.SpsData; using System; using System.Collections.Generic; using System.Text; using System.Linq; using Microsoft.EntityFrameworkCore; using System.Threading.Tasks; using System.Timers; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; namespace Dfs.WayneChina.SpsDataCourier { public class DbMonitor { #region Fields private Timer timer; private int interval; private bool smartFuel05Compatible; private Queue sendQueue = new Queue(); private object syncObj = new object(); private ConcurrentQueue processedTransQueue = new ConcurrentQueue(); private string connectionString; #endregion #region Events public event EventHandler OnCardTrxCreated; public event EventHandler OnAccountCreated; public event EventHandler OnAccountUpdated; public event EventHandler OnCardInfoCreated; public event EventHandler OnCardInfoUpdated; public event EventHandler OnCardInfoDeleted; //账户/卡充值记录,卡扣款记录, card top-up and reductions public event EventHandler OnRechargeCreated; //挂失解挂记录, lost card and found card public event EventHandler OnLostCardCreated; //注销卡, closed/deactivated card public event EventHandler OnClosedCardCreated; //基础黑名单卡, black card public event EventHandler OnBaseBlackCardCreated; public event EventHandler OnBaseBlackCardDeleted; //增量黑名单卡, blacklisted card public event EventHandler OnBlacklistedCardCreated; public event EventHandler OnBlacklistedCardUpdated; public event EventHandler OnBlacklistedCardDeleted; //减量黑名单卡, released card public event EventHandler OnReleasedCardCreated; //Card number in record public event EventHandler OnReleasedCardDeleted; public event EventHandler OnGrayInfoCreated; public event EventHandler OnGrayInfoDeleted; public event EventHandler OnMultipleAccountsCreated; public event EventHandler OnMultipleAccountsUpdated; public event EventHandler OnMultipleCardsCreated; public event EventHandler OnMultipleCardsUpdated; #endregion #region Logger NLog.Logger logger = NLog.LogManager.LoadConfiguration("NLog.config").GetLogger("SpsDataCourier"); #endregion #region Constructor public DbMonitor(int scanInterval, string connectionString, bool sf05Compatible) { interval = scanInterval; this.connectionString = connectionString; smartFuel05Compatible = sf05Compatible; timer = new Timer(); timer.Interval = interval * 1000; timer.Elapsed += Timer_Elapsed; } #endregion public void Start() { logger.Info("Starting database monitor"); timer.Start(); } public void Stop() { logger.Info("Stopping database monitor"); timer.Stop(); } private async void Timer_Elapsed(object sender, ElapsedEventArgs e) { logger.Debug("Start to scan database"); await CheckDbAsync(); } public bool AddToQueue(long gid) { lock (syncObj) { foreach (var item in sendQueue) { logger.Info($"Item in the send queue: {item}"); } if (sendQueue.Contains(gid)) { logger.Info($"Trying to enqueue gid {gid}, but it's being processed already"); return false; } sendQueue.Enqueue(gid); logger.Info($"After check, Gid: {gid} added to queue"); return true; } } public void ClearFromQueue(TTableaudit tableaudit) { if(tableaudit.TransCreated != null && tableaudit.TransCreated.Value != 0) { lock (syncObj) { logger.Info($"Removing {tableaudit.TransCreated.Value} from send queue"); sendQueue = new Queue(sendQueue.Where(i => i != tableaudit.TransCreated.Value)); } } } public void ClearFromQueue(long gid) { lock (syncObj) { logger.Info($"Removing {gid} from send queue"); sendQueue = new Queue(sendQueue.Where(i => i != gid)); } } public void AddToProcessedQueue(long gid) { logger.Info($"Adding {gid} to ProcessedQueue"); if (processedTransQueue.Count > 10) { long toBeRemoved = 0; processedTransQueue.TryDequeue(out toBeRemoved); logger.Info($"To be removed from ProcessedQueue: {toBeRemoved}"); } processedTransQueue.Enqueue(gid); } public bool IsTransAlreadyUploaded(long gid) { return processedTransQueue.Contains(gid); } private async Task CheckDbAsync() { timer.Stop(); //await CheckDataVersionsAsync(); try { using (var context = new SpsDbContext(connectionString)) { var allRecords = await context.TTableaudit.ToListAsync(); // Check the abundance of `deleted`, `created` and `updated` card records or accounts. var deletedCards = allRecords.Where(t => t.CardInfoDeleted != "0"); if (deletedCards.Count() > 0) logger.Info($"Deleted cards count: {deletedCards.Count()}"); //Remove the 'DeletedCard' records, can't get them anyway... try { context.TTableaudit.RemoveRange(allRecords.Where(t => t.CardInfoDeleted != "0")); await context.SaveChangesAsync(); } catch (Exception ex) { logger.Error($"Exception in removing the DELETED CARD records, {ex.ToString()}"); } ///////////////////////////////////////////////////// //upload all `created` accounts var createdAccounts = allRecords.Where(t => t.AccountCreated != 0); if (createdAccounts.Count() > 0) logger.Info($"Multiple created accounts, count: {createdAccounts.Count()}"); if (createdAccounts.Count() > 5) { List createdAccountInfo = new List(); foreach (var r in createdAccounts) { var expectedAccountInfo = context.TAcctinfo .AsNoTracking() .FirstOrDefault(a => a.Gid == Convert.ToUInt64(r.AccountCreated.Value)); if (expectedAccountInfo != null) createdAccountInfo.Add(expectedAccountInfo); } if (createdAccountInfo.Count > 0) { ProcessMultipleCreatedAccounts(createdAccountInfo, createdAccounts); } } ///////////////////////////////////////////////////// //upload all `updated` accounts var updatedAccounts = allRecords.Where(t => t.AccountUpdated != 0); if (updatedAccounts.Count() > 0) logger.Info($"Multiple updated accounts, count: {updatedAccounts.Count()}"); if (updatedAccounts.Count() > 5) { List updatedAccountInfo = new List(); foreach (var r in updatedAccounts) { var expectedAccountInfo = context.TAcctinfo .AsNoTracking() .FirstOrDefault(a => a.Gid == Convert.ToUInt64(r.AccountUpdated.Value)); if (expectedAccountInfo != null) updatedAccountInfo.Add(expectedAccountInfo); } if (updatedAccountInfo.Count > 0) { ProcessMultipleUpdatedAccounts(updatedAccountInfo, updatedAccounts); } } ///////////////////////////////////////////////////// //upload all `created` cards var createdCards = allRecords.Where(t => t.CardInfoCreated != 0); if (createdCards.Count() > 0) logger.Info($"Multiple created cards count: {createdCards.Count()}"); if (createdCards.Count() > 5) { List createdCardInfo = new List(); foreach (var r in createdCards) { var expectedCardInfo = context.TCardinfo .AsNoTracking() .FirstOrDefault(c => c.Gid == Convert.ToUInt64(r.CardInfoCreated.Value)); if (expectedCardInfo != null) createdCardInfo.Add(expectedCardInfo); } if (createdCardInfo.Count > 0) { logger.Info($"Found multiple created card info records, count: {createdCardInfo.Count}"); ProcessMultipleCreatedCards(createdCardInfo, createdCards); } } ///////////////////////////////////////////////////// //upload all `updated` cards var updatedCards = allRecords .Where(t => t.CardInfoUpdated != 0) .Distinct(new GenericComparer(a => a.CardInfoUpdated.Value)); if (updatedCards.Count() > 0) { logger.Info($"Multiple updated cards, count: {updatedCards.Count()}"); } if (updatedCards.Count() > 5) { List updatedCardInfo = new List(); foreach (var r in updatedCards) { var expectedCardInfo = context.TCardinfo .AsNoTracking() .FirstOrDefault(c => c.Gid == Convert.ToUInt64(r.CardInfoUpdated.Value)); if (expectedCardInfo != null) updatedCardInfo.Add(expectedCardInfo); } if (updatedCardInfo.Count > 0) { ProcessMultipleUpdatedCards(updatedCardInfo, updatedCards); } } //////////////////////////////////////////////// //////////////////////////////////////////////// // Just to refresh here allRecords = await context.TTableaudit.ToListAsync(); var deletedCardsCount1 = allRecords.Where(t => t.CardInfoDeleted != "0").Count(); if (deletedCardsCount1 > 0) logger.Info($"After refresh, delete cards count: {deletedCardsCount1}"); var recordCount = allRecords.Count; if (recordCount > 0) { var record = await context.TTableaudit.FirstAsync(); if (record.RechargeCreated.HasValue && record.RechargeCreated.Value != 0) { logger.Info($"New recharge or reduction record, Gid: {record.RechargeCreated.Value}"); ProcessRecharge(record); } else if (record.CardInfoCreated.HasValue && record.CardInfoCreated.Value != 0) { logger.Info($"New CardInfo record, Gid: {record.CardInfoCreated.Value}"); ProcessCardInfoCreated(record); } else if (record.CardInfoUpdated.HasValue && record.CardInfoUpdated.Value != 0) { logger.Info($"CardInfo updated, Gid: {record.CardInfoUpdated.Value}"); ProcessCardInfoUpdated(record); } else if (record.CardInfoDeleted != "0") { logger.Info($"CardInfo deleted, CardNo: {record.CardInfoDeleted}"); ProcessCardInfoDeleted(record); } else if (record.AccountCreated.HasValue && record.AccountCreated.Value != 0) { logger.Info($"New account created, Gid: {record.AccountCreated.Value}"); ProcessAccountCreated(record); } else if (record.AccountUpdated.HasValue && record.AccountUpdated.Value != 0) { logger.Info($"Account updated on GID: {record.AccountUpdated.Value}"); ProcessAccountUpdated(record); } else if (record.ClosedCardCreated.HasValue && record.ClosedCardCreated.Value != 0) { logger.Info($"Card closed/deactivated, Gid: {record.ClosedCardCreated.Value}"); ProcessClosedCard(record); } else if (record.LostCardCreated.HasValue && record.LostCardCreated.Value != 0) { logger.Info($"Card lost or found registerred, Gid: {record.LostCardCreated.Value}"); ProcessLostCardCreated(record); } else if (record.BlacklistedCardCreated.HasValue && record.BlacklistedCardCreated.Value != 0) { logger.Info($"A blacklisted card is registerred, Gid: {record.BlacklistedCardCreated.Value}"); ProcessBlacklistedCardCreated(record); } else if (record.BlacklistedCardUpdated.HasValue && record.BlacklistedCardUpdated.Value != 0) { logger.Info($"A card which was registered as LOST CARD is released, Gid: {record.BlacklistedCardUpdated.Value}"); ProcessBlacklistedCardUpdated(record); } else if (record.BlacklistedCardDeleted != "0") { logger.Info($"A blacklisted card was deleted, CardNo: {record.BlacklistedCardDeleted}"); ProcessBlacklistedCardDeleted(record); } else if (record.ReleasedCardCreated.HasValue && record.ReleasedCardCreated.Value != 0) { logger.Info($"A lost card was found and released, Gid: {record.ReleasedCardCreated.Value}"); ProcessReleasedCardCreated(record); } else if (record.ReleasedCardDeleted != "0") { logger.Info($"A released card is deleted, card no: {record.ReleasedCardDeleted}"); ProcessReleasedCardDeleted(record); } else if (record.BaseBlackCardCreated.HasValue && record.BaseBlackCardCreated.Value != 0) { logger.Info($"A base black card is created, gid: {record.BaseBlackCardCreated.Value}"); ProcessBaseBlackCardCreated(record); } else if (record.BaseBlackCardDeleted != "0") { logger.Info($"A base black card is deleted, card no: {record.BaseBlackCardDeleted}"); ProcessBaseBlackCardDeleted(record); } else if (record.TransCreated.HasValue && record.TransCreated.Value != 0) { logger.Info($"Db monitor, found a newly created card transaction, Gid: {record.TransCreated.Value}"); lock (syncObj) { var gid = record.TransCreated.Value; if (sendQueue.Contains(gid)) { if ((DateTime.Now - record.OperationTime).TotalSeconds < 30) { logger.Info($"Gid: {gid} being processed now"); timer.Start(); return; } else { logger.Info($"Trans of GID: {gid} was created more than 30 seconds ago!"); } } else { sendQueue.Enqueue(gid); logger.Info($"Enqueued: {gid}"); } } ProcessCardTransaction(record); lock (syncObj) { if (sendQueue.Count >= 5) { var dequeuedGid = sendQueue.Dequeue(); logger.Info($"Dequeued: {dequeuedGid}"); } } } else if (record.GrayInfoCreated.HasValue && record.GrayInfoCreated.Value != 0) { logger.Info($"A new Gray info created, gid: {record.GrayInfoCreated.Value}"); ProcessGrayInfoCreated(record); } else if (record.GrayInfoDeleted != "0") { logger.Info($"A existing Gray info deleted, Card No: {record.GrayInfoDeleted}"); ProcessGrayInfoDeleted(record); } } else { logger.Debug("No record found"); } } } catch (Exception ex) { logger.Error(ex); } timer.Start(); } #region Multiple records handling private void ProcessMultipleCreatedAccounts(IEnumerable createdAccountRecords, IEnumerable createdAccoutsAudit) { OnMultipleAccountsCreated?.Invoke(this, new MultipleAccountRecordsEventArgs(createdAccountRecords, createdAccoutsAudit)); } private void ProcessMultipleUpdatedAccounts(IEnumerable updatedAccountRecords, IEnumerable updatedAccountsAudit) { OnMultipleAccountsUpdated?.Invoke(this, new MultipleAccountRecordsEventArgs(updatedAccountRecords, updatedAccountsAudit)); } private void ProcessMultipleCreatedCards(IEnumerable cardInfoRecords, IEnumerable createdCardsAudit) { OnMultipleCardsCreated?.Invoke(this, new MultipleCardRecordsEventArgs(cardInfoRecords, createdCardsAudit)); } private void ProcessMultipleUpdatedCards(IEnumerable cardInfoRecords, IEnumerable updatedCardsAudit) { OnMultipleCardsUpdated?.Invoke(this, new MultipleCardRecordsEventArgs(cardInfoRecords, updatedCardsAudit)); } #endregion #region Single record handling /////// Card payment record, for Smart fuel 1.0 private void ProcessCardTransaction(TTableaudit tableAudit) { OnCardTrxCreated?.Invoke(this, new CardTrxEventArgs(null, tableAudit)); } /////// Account, 2 operations. private void ProcessAccountCreated(TTableaudit tableAudit) { OnAccountCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessAccountUpdated(TTableaudit tableAudit) { OnAccountUpdated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } /////// CardInfo, 3 operations. private void ProcessCardInfoCreated(TTableaudit tableAudit) { OnCardInfoCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessCardInfoUpdated(TTableaudit tableAudit) { OnCardInfoUpdated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessCardInfoDeleted(TTableaudit tableAudit) { OnCardInfoDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.CardInfoDeleted, tableAudit)); } /////// Account and card top-up and reductions, 1 operation. private void ProcessRecharge(TTableaudit tableAudit) { OnRechargeCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } /////// Card closed or deactivated, 1 operation. private void ProcessClosedCard(TTableaudit tableAudit) { OnClosedCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } /////// Lost card and found card, 1 operation. private void ProcessLostCardCreated(TTableaudit tableAudit) { OnLostCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } /////// Base Blackcard, 2 operations. /// private void ProcessBaseBlackCardCreated(TTableaudit tableAudit) { OnBaseBlackCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessBaseBlackCardDeleted(TTableaudit tableAudit) { OnBaseBlackCardDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.BaseBlackCardDeleted, tableAudit)); } /////// Blacklisted card, 3 operations. private void ProcessBlacklistedCardCreated(TTableaudit tableAudit) { OnBlacklistedCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessBlacklistedCardUpdated(TTableaudit tableAudit) { OnBlacklistedCardUpdated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessBlacklistedCardDeleted(TTableaudit tableAudit) { OnBlacklistedCardDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.BlacklistedCardDeleted, tableAudit)); } /////// Released card, 2 operations. private void ProcessReleasedCardCreated(TTableaudit tableAudit) { OnReleasedCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessReleasedCardDeleted(TTableaudit tableAudit) { OnReleasedCardDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.ReleasedCardDeleted, tableAudit)); } /////// Gray info, 2 operations. private void ProcessGrayInfoCreated(TTableaudit tableAudit) { OnGrayInfoCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } private void ProcessGrayInfoDeleted(TTableaudit tableAudit) { OnGrayInfoDeleted?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime)); } #endregion #region Data versions private async Task CheckDataVersionsAsync() { using (var context = new SpsDbContext(connectionString)) { var versionData = await context.TVersioninfo.ToListAsync(); foreach (var versionInfo in versionData) { logger.Debug($"{versionInfo.Gid}, {versionInfo.VerNo}, {versionInfo.EffTime}, {versionInfo.LoseEffTime}"); } } } #endregion } }