DbMonitor.cs 26 KB


  1. using Dfs.WayneChina.SpsDataCourier.SpsData;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Text;
  5. using System.Linq;
  6. using Microsoft.EntityFrameworkCore;
  7. using System.Threading.Tasks;
  8. using System.Timers;
  9. using Microsoft.Extensions.Logging;
  10. using System.Collections.Concurrent;
  11. namespace Dfs.WayneChina.SpsDataCourier
  12. {
  13. public class DbMonitor
  14. {
  15. #region Fields
  16. private Timer timer;
  17. private int interval;
  18. private bool smartFuel05Compatible;
  19. private Queue<long> sendQueue = new Queue<long>();
  20. private object syncObj = new object();
  21. private ConcurrentQueue<long> processedTransQueue = new ConcurrentQueue<long>();
  22. private string connectionString;
  23. #endregion
  24. #region Events
  25. public event EventHandler<CardTrxEventArgs> OnCardTrxCreated;
  26. public event EventHandler<EntryFoundEventArgs> OnAccountCreated;
  27. public event EventHandler<EntryFoundEventArgs> OnAccountUpdated;
  28. public event EventHandler<EntryFoundEventArgs> OnCardInfoCreated;
  29. public event EventHandler<EntryFoundEventArgs> OnCardInfoUpdated;
  30. public event EventHandler<CardChangedEventArgs> OnCardInfoDeleted;
  31. //账户/卡充值记录,卡扣款记录, card top-up and reductions
  32. public event EventHandler<EntryFoundEventArgs> OnRechargeCreated;
  33. //挂失解挂记录, lost card and found card
  34. public event EventHandler<EntryFoundEventArgs> OnLostCardCreated;
  35. //注销卡, closed/deactivated card
  36. public event EventHandler<EntryFoundEventArgs> OnClosedCardCreated;
  37. //基础黑名单卡, black card
  38. public event EventHandler<EntryFoundEventArgs> OnBaseBlackCardCreated;
  39. public event EventHandler<CardChangedEventArgs> OnBaseBlackCardDeleted;
  40. //增量黑名单卡, blacklisted card
  41. public event EventHandler<EntryFoundEventArgs> OnBlacklistedCardCreated;
  42. public event EventHandler<EntryFoundEventArgs> OnBlacklistedCardUpdated;
  43. public event EventHandler<CardChangedEventArgs> OnBlacklistedCardDeleted;
  44. //减量黑名单卡, released card
  45. public event EventHandler<EntryFoundEventArgs> OnReleasedCardCreated;
  46. //Card number in record
  47. public event EventHandler<CardChangedEventArgs> OnReleasedCardDeleted;
  48. public event EventHandler<EntryFoundEventArgs> OnGrayInfoCreated;
  49. public event EventHandler<EntryFoundEventArgs> OnGrayInfoDeleted;
  50. public event EventHandler<MultipleAccountRecordsEventArgs> OnMultipleAccountsCreated;
  51. public event EventHandler<MultipleAccountRecordsEventArgs> OnMultipleAccountsUpdated;
  52. public event EventHandler<MultipleCardRecordsEventArgs> OnMultipleCardsCreated;
  53. public event EventHandler<MultipleCardRecordsEventArgs> OnMultipleCardsUpdated;
  54. #endregion
  55. #region Logger
  56. NLog.Logger logger = NLog.LogManager.LoadConfiguration("NLog.config").GetLogger("SpsDataCourier");
  57. #endregion
  58. #region Constructor
  59. public DbMonitor(int scanInterval, string connectionString, bool sf05Compatible)
  60. {
  61. interval = scanInterval;
  62. this.connectionString = connectionString;
  63. smartFuel05Compatible = sf05Compatible;
  64. timer = new Timer();
  65. timer.Interval = interval * 1000;
  66. timer.Elapsed += Timer_Elapsed;
  67. }
  68. #endregion
  69. public void Start()
  70. {
  71. logger.Info("Starting database monitor");
  72. timer.Start();
  73. }
  74. public void Stop()
  75. {
  76. logger.Info("Stopping database monitor");
  77. timer.Stop();
  78. }
  79. private async void Timer_Elapsed(object sender, ElapsedEventArgs e)
  80. {
  81. logger.Debug("Start to scan database");
  82. await CheckDbAsync();
  83. }
  84. public bool AddToQueue(long gid)
  85. {
  86. lock (syncObj)
  87. {
  88. foreach (var item in sendQueue)
  89. {
  90. logger.Info($"Item in the send queue: {item}");
  91. }
  92. if (sendQueue.Contains(gid))
  93. {
  94. logger.Info($"Trying to enqueue gid {gid}, but it's being processed already");
  95. return false;
  96. }
  97. sendQueue.Enqueue(gid);
  98. logger.Info($"After check, Gid: {gid} added to queue");
  99. return true;
  100. }
  101. }
  102. public void ClearFromQueue(TTableaudit tableaudit)
  103. {
  104. if(tableaudit.TransCreated != null && tableaudit.TransCreated.Value != 0)
  105. {
  106. lock (syncObj)
  107. {
  108. logger.Info($"Removing {tableaudit.TransCreated.Value} from send queue");
  109. sendQueue = new Queue<long>(sendQueue.Where(i => i != tableaudit.TransCreated.Value));
  110. }
  111. }
  112. }
  113. public void ClearFromQueue(long gid)
  114. {
  115. lock (syncObj)
  116. {
  117. logger.Info($"Removing {gid} from send queue");
  118. sendQueue = new Queue<long>(sendQueue.Where(i => i != gid));
  119. }
  120. }
  121. public void AddToProcessedQueue(long gid)
  122. {
  123. logger.Info($"Adding {gid} to ProcessedQueue");
  124. if (processedTransQueue.Count > 10)
  125. {
  126. long toBeRemoved = 0;
  127. processedTransQueue.TryDequeue(out toBeRemoved);
  128. logger.Info($"To be removed from ProcessedQueue: {toBeRemoved}");
  129. }
  130. processedTransQueue.Enqueue(gid);
  131. }
  132. public bool IsTransAlreadyUploaded(long gid)
  133. {
  134. return processedTransQueue.Contains(gid);
  135. }
  136. private async Task CheckDbAsync()
  137. {
  138. timer.Stop();
  139. //await CheckDataVersionsAsync();
  140. try
  141. {
  142. using (var context = new SpsDbContext(connectionString))
  143. {
  144. var allRecords = await context.TTableaudit.ToListAsync();
  145. // Check the abundance of `deleted`, `created` and `updated` card records or accounts.
  146. var deletedCards = allRecords.Where(t => t.CardInfoDeleted != "0");
  147. if (deletedCards.Count() > 0)
  148. logger.Info($"Deleted cards count: {deletedCards.Count()}");
  149. //Remove the 'DeletedCard' records, can't get them anyway...
  150. try
  151. {
  152. context.TTableaudit.RemoveRange(allRecords.Where(t => t.CardInfoDeleted != "0"));
  153. await context.SaveChangesAsync();
  154. }
  155. catch (Exception ex)
  156. {
  157. logger.Error($"Exception in removing the DELETED CARD records, {ex.ToString()}");
  158. }
  159. /////////////////////////////////////////////////////
  160. //upload all `created` accounts
  161. var createdAccounts = allRecords.Where(t => t.AccountCreated != 0);
  162. if (createdAccounts.Count() > 0)
  163. logger.Info($"Multiple created accounts, count: {createdAccounts.Count()}");
  164. if (createdAccounts.Count() > 5)
  165. {
  166. List<TAcctinfo> createdAccountInfo = new List<TAcctinfo>();
  167. foreach (var r in createdAccounts)
  168. {
  169. var expectedAccountInfo = context.TAcctinfo
  170. .AsNoTracking()
  171. .FirstOrDefault(a => a.Gid == Convert.ToUInt64(r.AccountCreated.Value));
  172. if (expectedAccountInfo != null)
  173. createdAccountInfo.Add(expectedAccountInfo);
  174. }
  175. if (createdAccountInfo.Count > 0)
  176. {
  177. ProcessMultipleCreatedAccounts(createdAccountInfo, createdAccounts);
  178. }
  179. }
  180. /////////////////////////////////////////////////////
  181. //upload all `updated` accounts
  182. var updatedAccounts = allRecords.Where(t => t.AccountUpdated != 0);
  183. if (updatedAccounts.Count() > 0)
  184. logger.Info($"Multiple updated accounts, count: {updatedAccounts.Count()}");
  185. if (updatedAccounts.Count() > 5)
  186. {
  187. List<TAcctinfo> updatedAccountInfo = new List<TAcctinfo>();
  188. foreach (var r in updatedAccounts)
  189. {
  190. var expectedAccountInfo = context.TAcctinfo
  191. .AsNoTracking()
  192. .FirstOrDefault(a => a.Gid == Convert.ToUInt64(r.AccountUpdated.Value));
  193. if (expectedAccountInfo != null)
  194. updatedAccountInfo.Add(expectedAccountInfo);
  195. }
  196. if (updatedAccountInfo.Count > 0)
  197. {
  198. ProcessMultipleUpdatedAccounts(updatedAccountInfo, updatedAccounts);
  199. }
  200. }
  201. /////////////////////////////////////////////////////
  202. //upload all `created` cards
  203. var createdCards = allRecords.Where(t => t.CardInfoCreated != 0);
  204. if (createdCards.Count() > 0)
  205. logger.Info($"Multiple created cards count: {createdCards.Count()}");
  206. if (createdCards.Count() > 5)
  207. {
  208. List<TCardinfo> createdCardInfo = new List<TCardinfo>();
  209. foreach (var r in createdCards)
  210. {
  211. var expectedCardInfo = context.TCardinfo
  212. .AsNoTracking()
  213. .FirstOrDefault(c => c.Gid == Convert.ToUInt64(r.CardInfoCreated.Value));
  214. if (expectedCardInfo != null)
  215. createdCardInfo.Add(expectedCardInfo);
  216. }
  217. if (createdCardInfo.Count > 0)
  218. {
  219. logger.Info($"Found multiple created card info records, count: {createdCardInfo.Count}");
  220. ProcessMultipleCreatedCards(createdCardInfo, createdCards);
  221. }
  222. }
  223. /////////////////////////////////////////////////////
  224. //upload all `updated` cards
  225. var updatedCards = allRecords
  226. .Where(t => t.CardInfoUpdated != 0)
  227. .Distinct(new GenericComparer<TTableaudit, long>(a => a.CardInfoUpdated.Value));
  228. if (updatedCards.Count() > 0)
  229. {
  230. logger.Info($"Multiple updated cards, count: {updatedCards.Count()}");
  231. }
  232. if (updatedCards.Count() > 5)
  233. {
  234. List<TCardinfo> updatedCardInfo = new List<TCardinfo>();
  235. foreach (var r in updatedCards)
  236. {
  237. var expectedCardInfo = context.TCardinfo
  238. .AsNoTracking()
  239. .FirstOrDefault(c => c.Gid == Convert.ToUInt64(r.CardInfoUpdated.Value));
  240. if (expectedCardInfo != null)
  241. updatedCardInfo.Add(expectedCardInfo);
  242. }
  243. if (updatedCardInfo.Count > 0)
  244. {
  245. ProcessMultipleUpdatedCards(updatedCardInfo, updatedCards);
  246. }
  247. }
  248. ////////////////////////////////////////////////
  249. ////////////////////////////////////////////////
  250. // Just to refresh here
  251. allRecords = await context.TTableaudit.ToListAsync();
  252. var deletedCardsCount1 = allRecords.Where(t => t.CardInfoDeleted != "0").Count();
  253. if (deletedCardsCount1 > 0)
  254. logger.Info($"After refresh, delete cards count: {deletedCardsCount1}");
  255. var recordCount = allRecords.Count;
  256. if (recordCount > 0)
  257. {
  258. var record = await context.TTableaudit.FirstAsync();
  259. if (record.RechargeCreated.HasValue && record.RechargeCreated.Value != 0)
  260. {
  261. logger.Info($"New recharge or reduction record, Gid: {record.RechargeCreated.Value}");
  262. ProcessRecharge(record);
  263. }
  264. else if (record.CardInfoCreated.HasValue && record.CardInfoCreated.Value != 0)
  265. {
  266. logger.Info($"New CardInfo record, Gid: {record.CardInfoCreated.Value}");
  267. ProcessCardInfoCreated(record);
  268. }
  269. else if (record.CardInfoUpdated.HasValue && record.CardInfoUpdated.Value != 0)
  270. {
  271. logger.Info($"CardInfo updated, Gid: {record.CardInfoUpdated.Value}");
  272. ProcessCardInfoUpdated(record);
  273. }
  274. else if (record.CardInfoDeleted != "0")
  275. {
  276. logger.Info($"CardInfo deleted, CardNo: {record.CardInfoDeleted}");
  277. ProcessCardInfoDeleted(record);
  278. }
  279. else if (record.AccountCreated.HasValue && record.AccountCreated.Value != 0)
  280. {
  281. logger.Info($"New account created, Gid: {record.AccountCreated.Value}");
  282. ProcessAccountCreated(record);
  283. }
  284. else if (record.AccountUpdated.HasValue && record.AccountUpdated.Value != 0)
  285. {
  286. logger.Info($"Account updated on GID: {record.AccountUpdated.Value}");
  287. ProcessAccountUpdated(record);
  288. }
  289. else if (record.ClosedCardCreated.HasValue && record.ClosedCardCreated.Value != 0)
  290. {
  291. logger.Info($"Card closed/deactivated, Gid: {record.ClosedCardCreated.Value}");
  292. ProcessClosedCard(record);
  293. }
  294. else if (record.LostCardCreated.HasValue && record.LostCardCreated.Value != 0)
  295. {
  296. logger.Info($"Card lost or found registerred, Gid: {record.LostCardCreated.Value}");
  297. ProcessLostCardCreated(record);
  298. }
  299. else if (record.BlacklistedCardCreated.HasValue && record.BlacklistedCardCreated.Value != 0)
  300. {
  301. logger.Info($"A blacklisted card is registerred, Gid: {record.BlacklistedCardCreated.Value}");
  302. ProcessBlacklistedCardCreated(record);
  303. }
  304. else if (record.BlacklistedCardUpdated.HasValue && record.BlacklistedCardUpdated.Value != 0)
  305. {
  306. logger.Info($"A card which was registered as LOST CARD is released, Gid: {record.BlacklistedCardUpdated.Value}");
  307. ProcessBlacklistedCardUpdated(record);
  308. }
  309. else if (record.BlacklistedCardDeleted != "0")
  310. {
  311. logger.Info($"A blacklisted card was deleted, CardNo: {record.BlacklistedCardDeleted}");
  312. ProcessBlacklistedCardDeleted(record);
  313. }
  314. else if (record.ReleasedCardCreated.HasValue && record.ReleasedCardCreated.Value != 0)
  315. {
  316. logger.Info($"A lost card was found and released, Gid: {record.ReleasedCardCreated.Value}");
  317. ProcessReleasedCardCreated(record);
  318. }
  319. else if (record.ReleasedCardDeleted != "0")
  320. {
  321. logger.Info($"A released card is deleted, card no: {record.ReleasedCardDeleted}");
  322. ProcessReleasedCardDeleted(record);
  323. }
  324. else if (record.BaseBlackCardCreated.HasValue && record.BaseBlackCardCreated.Value != 0)
  325. {
  326. logger.Info($"A base black card is created, gid: {record.BaseBlackCardCreated.Value}");
  327. ProcessBaseBlackCardCreated(record);
  328. }
  329. else if (record.BaseBlackCardDeleted != "0")
  330. {
  331. logger.Info($"A base black card is deleted, card no: {record.BaseBlackCardDeleted}");
  332. ProcessBaseBlackCardDeleted(record);
  333. }
  334. else if (record.TransCreated.HasValue && record.TransCreated.Value != 0)
  335. {
  336. logger.Info($"Db monitor, found a newly created card transaction, Gid: {record.TransCreated.Value}");
  337. lock (syncObj)
  338. {
  339. var gid = record.TransCreated.Value;
  340. if (sendQueue.Contains(gid))
  341. {
  342. if ((DateTime.Now - record.OperationTime).TotalSeconds < 30)
  343. {
  344. logger.Info($"Gid: {gid} being processed now");
  345. timer.Start();
  346. return;
  347. }
  348. else
  349. {
  350. logger.Info($"Trans of GID: {gid} was created more than 30 seconds ago!");
  351. }
  352. }
  353. else
  354. {
  355. sendQueue.Enqueue(gid);
  356. logger.Info($"Enqueued: {gid}");
  357. }
  358. }
  359. ProcessCardTransaction(record);
  360. lock (syncObj)
  361. {
  362. if (sendQueue.Count >= 5)
  363. {
  364. var dequeuedGid = sendQueue.Dequeue();
  365. logger.Info($"Dequeued: {dequeuedGid}");
  366. }
  367. }
  368. }
  369. else if (record.GrayInfoCreated.HasValue && record.GrayInfoCreated.Value != 0)
  370. {
  371. logger.Info($"A new Gray info created, gid: {record.GrayInfoCreated.Value}");
  372. ProcessGrayInfoCreated(record);
  373. }
  374. else if (record.GrayInfoDeleted != "0")
  375. {
  376. logger.Info($"A existing Gray info deleted, Card No: {record.GrayInfoDeleted}");
  377. ProcessGrayInfoDeleted(record);
  378. }
  379. }
  380. else
  381. {
  382. logger.Debug("No record found");
  383. }
  384. }
  385. }
  386. catch (Exception ex)
  387. {
  388. logger.Error(ex);
  389. }
  390. timer.Start();
  391. }
  392. #region Multiple records handling
  393. private void ProcessMultipleCreatedAccounts(IEnumerable<TAcctinfo> createdAccountRecords, IEnumerable<TTableaudit> createdAccoutsAudit)
  394. {
  395. OnMultipleAccountsCreated?.Invoke(this, new MultipleAccountRecordsEventArgs(createdAccountRecords, createdAccoutsAudit));
  396. }
  397. private void ProcessMultipleUpdatedAccounts(IEnumerable<TAcctinfo> updatedAccountRecords, IEnumerable<TTableaudit> updatedAccountsAudit)
  398. {
  399. OnMultipleAccountsUpdated?.Invoke(this, new MultipleAccountRecordsEventArgs(updatedAccountRecords, updatedAccountsAudit));
  400. }
  401. private void ProcessMultipleCreatedCards(IEnumerable<TCardinfo> cardInfoRecords, IEnumerable<TTableaudit> createdCardsAudit)
  402. {
  403. OnMultipleCardsCreated?.Invoke(this, new MultipleCardRecordsEventArgs(cardInfoRecords, createdCardsAudit));
  404. }
  405. private void ProcessMultipleUpdatedCards(IEnumerable<TCardinfo> cardInfoRecords, IEnumerable<TTableaudit> updatedCardsAudit)
  406. {
  407. OnMultipleCardsUpdated?.Invoke(this, new MultipleCardRecordsEventArgs(cardInfoRecords, updatedCardsAudit));
  408. }
  409. #endregion
  410. #region Single record handling
  411. /////// Card payment record, for Smart fuel 1.0
  412. private void ProcessCardTransaction(TTableaudit tableAudit)
  413. {
  414. OnCardTrxCreated?.Invoke(this, new CardTrxEventArgs(null, tableAudit));
  415. }
  416. /////// Account, 2 operations.
  417. private void ProcessAccountCreated(TTableaudit tableAudit)
  418. {
  419. OnAccountCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  420. }
  421. private void ProcessAccountUpdated(TTableaudit tableAudit)
  422. {
  423. OnAccountUpdated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  424. }
  425. /////// CardInfo, 3 operations.
  426. private void ProcessCardInfoCreated(TTableaudit tableAudit)
  427. {
  428. OnCardInfoCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  429. }
  430. private void ProcessCardInfoUpdated(TTableaudit tableAudit)
  431. {
  432. OnCardInfoUpdated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  433. }
  434. private void ProcessCardInfoDeleted(TTableaudit tableAudit)
  435. {
  436. OnCardInfoDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.CardInfoDeleted, tableAudit));
  437. }
  438. /////// Account and card top-up and reductions, 1 operation.
  439. private void ProcessRecharge(TTableaudit tableAudit)
  440. {
  441. OnRechargeCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  442. }
  443. /////// Card closed or deactivated, 1 operation.
  444. private void ProcessClosedCard(TTableaudit tableAudit)
  445. {
  446. OnClosedCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  447. }
  448. /////// Lost card and found card, 1 operation.
  449. private void ProcessLostCardCreated(TTableaudit tableAudit)
  450. {
  451. OnLostCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  452. }
  453. /////// Base Blackcard, 2 operations.
  454. ///
  455. private void ProcessBaseBlackCardCreated(TTableaudit tableAudit)
  456. {
  457. OnBaseBlackCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  458. }
  459. private void ProcessBaseBlackCardDeleted(TTableaudit tableAudit)
  460. {
  461. OnBaseBlackCardDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.BaseBlackCardDeleted, tableAudit));
  462. }
  463. /////// Blacklisted card, 3 operations.
  464. private void ProcessBlacklistedCardCreated(TTableaudit tableAudit)
  465. {
  466. OnBlacklistedCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  467. }
  468. private void ProcessBlacklistedCardUpdated(TTableaudit tableAudit)
  469. {
  470. OnBlacklistedCardUpdated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  471. }
  472. private void ProcessBlacklistedCardDeleted(TTableaudit tableAudit)
  473. {
  474. OnBlacklistedCardDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.BlacklistedCardDeleted, tableAudit));
  475. }
  476. /////// Released card, 2 operations.
  477. private void ProcessReleasedCardCreated(TTableaudit tableAudit)
  478. {
  479. OnReleasedCardCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  480. }
  481. private void ProcessReleasedCardDeleted(TTableaudit tableAudit)
  482. {
  483. OnReleasedCardDeleted?.Invoke(this, new CardChangedEventArgs(tableAudit.ReleasedCardDeleted, tableAudit));
  484. }
  485. /////// Gray info, 2 operations.
  486. private void ProcessGrayInfoCreated(TTableaudit tableAudit)
  487. {
  488. OnGrayInfoCreated?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  489. }
  490. private void ProcessGrayInfoDeleted(TTableaudit tableAudit)
  491. {
  492. OnGrayInfoDeleted?.Invoke(this, new EntryFoundEventArgs(tableAudit, tableAudit.OperationTime));
  493. }
  494. #endregion
  495. #region Data versions
  496. private async Task CheckDataVersionsAsync()
  497. {
  498. using (var context = new SpsDbContext(connectionString))
  499. {
  500. var versionData = await context.TVersioninfo.ToListAsync();
  501. foreach (var versionInfo in versionData)
  502. {
  503. logger.Debug($"{versionInfo.Gid}, {versionInfo.VerNo}, {versionInfo.EffTime}, {versionInfo.LoseEffTime}");
  504. }
  505. }
  506. }
  507. #endregion
  508. }
  509. }