SiteConfigScannerCommunicator.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Configuration;
  4. using System.Data;
  5. using System.Data.Odbc;
  6. using System.Data.SqlClient;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading;
  11. using System.Timers;
  12. using Wayne.Lib;
  13. using Wayne.Lib.Log;
  14. namespace SinochemInternetPlusApp
  15. {
  16. internal class SiteConfigScannerCommunicator : ICommunicator, IDisposable
  17. {
  18. static NLog.Logger debugLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("PumpHandler");
  19. /// <summary>
  20. /// 0 for not started, 1 for started already.
  21. /// </summary>
  22. private int isStarted = 0;
  23. //private readonly DebugLogger debugLogger =
  24. // new DebugLogger(new IdentifiableEntity(0, "FC2PosProxyMain", "", null));
  25. /// <summary>
  26. /// PumpIdAsSideA:PumpIdAsSideB:JiHao
  27. /// </summary>
  28. private readonly IEnumerable<Tuple<int, int, int>> pumpSidePairs = null;
  29. /// <summary>
  30. /// pumpId:RealHoseLogicalId:HuiTianSiteLevelHoseId
  31. /// </summary>
  32. public static IEnumerable<Tuple<int, int, int>> nozzleRemappingPairs = null;
  33. private System.Timers.Timer scannerTimer;
  34. /// <summary>
  35. /// Start a site configuration scanner and try sync the FC db with POS's.
  36. /// call start() for this communicator will force a sync right now.
  37. /// </summary>
  38. /// <param name="scanningInterval"></param>
  39. public SiteConfigScannerCommunicator(int scanningInterval)
  40. {
  41. var pumpSideMappingRawString = GenericSinochemEpsApp.AppSettings["PumpSideMapping"].Trim().Replace(" ", "");
  42. if (pumpSideMappingRawString.Substring(pumpSideMappingRawString.Length - 1) == ";")
  43. pumpSideMappingRawString = pumpSideMappingRawString.Substring(0, pumpSideMappingRawString.Length - 1);
  44. this.pumpSidePairs = pumpSideMappingRawString.Split(';').Select(p =>
  45. new Tuple<int, int, int>(int.Parse(p.Split(':')[0]), int.Parse(p.Split(':')[1]),
  46. int.Parse(p.Split(':')[2])));
  47. if (GenericSinochemEpsApp.AppSettings["forceMappingFusionHoseToHuiTianHose"] != null)
  48. {
  49. var nozzleRemappingRawString = GenericSinochemEpsApp.AppSettings["forceMappingFusionHoseToHuiTianHose"].Trim().Replace(" ", "");
  50. if (nozzleRemappingRawString.Substring(nozzleRemappingRawString.Length - 1) == ";")
  51. nozzleRemappingRawString = nozzleRemappingRawString.Substring(0, nozzleRemappingRawString.Length - 1);
  52. debugLogger.Info("will parse forceMappingFusionHoseToHuiTianHose, raw string in config is: " + nozzleRemappingRawString, DebugLogLevel.Maximized);
  53. nozzleRemappingPairs = nozzleRemappingRawString.Split(';').Select(p =>
  54. new Tuple<int, int, int>(int.Parse(p.Split(':')[0]), int.Parse(p.Split(':')[1]),
  55. int.Parse(p.Split(':')[2])));
  56. }
  57. this.scannerTimer = new System.Timers.Timer(scanningInterval);
  58. this.scannerTimer.Elapsed += scannerTimer_Elapsed;
  59. }
  60. void scannerTimer_Elapsed(object sender, ElapsedEventArgs e)
  61. {
  62. this.PerformCompareAndUpdate();
  63. }
  64. /// <summary>
  65. /// read config from FC database, and check if need update to POS database.
  66. /// </summary>
  67. /// <returns>true for updated, false for not updated.</returns>
  68. private bool PerformCompareAndUpdate()
  69. {
  70. try
  71. {
  72. var configInFcDb = this.LoadOrderedConfigFromFcDb();
  73. var configInPosDb = this.LoadOrderedConfigFromPosDb();
  74. if (configInFcDb == null || !configInFcDb.Any())
  75. {
  76. debugLogger.Info("config In FC Database is empty, skip this round");
  77. return false;
  78. }
  79. SiteConfigUtility.Default.UpdateLatestSiteConfig(configInFcDb);
  80. // for now, always overwrite the whole table in POS database without further check for rows or columns level.
  81. using (var posConn =
  82. new SqlConnection(GenericSinochemEpsApp.AppSettings["PosDatabaseConnStr"]))
  83. {
  84. /* configInFcDb structure like below */
  85. //" JiHao: " + oneRow["jiHao"] + ", sideId: " + oneRow["sideId"] +
  86. //", nozzleLogicalId: " + oneRow["HoseLogicalId"] +
  87. //", siteLevelNozzleId: " + oneRow["siteLevelNozzleId"]
  88. string bulkInsertCmd = configInFcDb.Select(
  89. r => string.Format("Select {0}, '{1}', {2}, {3}",
  90. r["jiHao"],
  91. r["sideId"],
  92. r["HoseLogicalId"],
  93. r["siteLevelNozzleId"])).Aggregate((acc, n) => acc + " Union all " + n);
  94. var truncateAndInsertCmd =
  95. new SqlCommand(
  96. "TRUNCATE table jyjpz; INSERT jyjpz (jihao, abtype, qianghao, luojiqh) " + bulkInsertCmd,
  97. posConn);
  98. debugLogger.Info("truncateAndInsertSiteConfigCmd: " + truncateAndInsertCmd.CommandText);
  99. posConn.Open();
  100. truncateAndInsertCmd.ExecuteNonQuery();
  101. return true;
  102. }
  103. }
  104. catch (Exception ex)
  105. {
  106. debugLogger.Error("PerformCompareAndUpdate exceptioned, detail: " + ex);
  107. return false;
  108. }
  109. }
  110. public bool IsStarted
  111. {
  112. get { return this.isStarted == 1; }
  113. }
  114. public bool Start()
  115. {
  116. if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0))
  117. {
  118. // the initial load must be succeed!
  119. if (!this.PerformCompareAndUpdate())
  120. {
  121. return false;
  122. }
  123. this.scannerTimer.Start();
  124. return true;
  125. }
  126. else
  127. {
  128. throw new InvalidOperationException("Already started.");
  129. }
  130. }
  131. private IEnumerable<DataRow> LoadOrderedConfigFromFcDb()
  132. {
  133. try
  134. {
  135. using (var fcOdbcConnection = new SqlConnection(GenericSinochemEpsApp.AppSettings["FCDatabaseConnStr"]))
  136. {
  137. using (var oda = new SqlDataAdapter(
  138. "select id as PumpId, device_id as HoseName, param_value as HoseLogicalId " +
  139. " from config_values where library = 'pump' and device_id like 'hose%' and parameter ='logical_id' " +
  140. " order by id::int, device_id",
  141. fcOdbcConnection))
  142. {
  143. var siteNozzlesDataTableInFcDb = new DataTable();
  144. oda.Fill(siteNozzlesDataTableInFcDb);
  145. //MemoryStream ms = new MemoryStream();
  146. //siteNozzlesDataTableInFcDb.WriteXml(ms);
  147. //StreamReader sr = new StreamReader(ms);
  148. //var rawData = sr.ReadToEnd();
  149. //debugLogger.Add("LoadConfigFromFcDb original data: \r\n" + rawData, DebugLogLevel.Normal);
  150. /* the returned data from postgresql db is like:
  151. PumpId HoseName HoseLogicalId
  152. "1" "hose3" "3"
  153. "1" "hose4" "4"
  154. "1" "hose2" "2"
  155. "1" "hose1" "1"
  156. "10" "hose2" "2"
  157. "10" "hose1" "1"
  158. "10" "hose3" "3"
  159. "10" "hose4" "4"
  160. */
  161. // PetroChina defined a id of: 'jiHao' which stands for a physical pump(2 sides).
  162. siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("jiHao"));
  163. // PetroChina need another id of site overall id for a nozzle, start from 1.
  164. // we cacualte this based on jiHao and HostLogicalId
  165. siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("siteLevelNozzleId"));
  166. // as doc defined, side A is 0, side B is 1.
  167. siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("sideId"));
  168. var siteNozzlesDataRowsInFcDb = siteNozzlesDataTableInFcDb.Rows.Cast<DataRow>();
  169. foreach (var row in siteNozzlesDataRowsInFcDb)
  170. {
  171. var pumpId = int.Parse(row["pumpid"].ToString());
  172. var possibleSideA = this.pumpSidePairs.FirstOrDefault(p => p.Item1 == pumpId);
  173. Tuple<int, int, int> possibleSideB = null;
  174. if (possibleSideA != null)
  175. {
  176. row["sideId"] = "0";
  177. row["jiHao"] = possibleSideA.Item3;
  178. }
  179. else if ((possibleSideB = this.pumpSidePairs.FirstOrDefault(p => p.Item2 == pumpId)) != null)
  180. {
  181. row["sideId"] = "1";
  182. row["jiHao"] = possibleSideB.Item3;
  183. }
  184. else
  185. {
  186. throw new ArgumentException("Pump with pumpId: " + pumpId +
  187. " is neither side A nor side B, pls check Side definition config file.");
  188. }
  189. }
  190. var sortedSiteNozzlesDataRowsInFcDb =
  191. siteNozzlesDataRowsInFcDb.OrderBy(r => int.Parse(r["jiHao"].ToString()))
  192. .ThenBy(r => r["sideId"])
  193. .ThenBy(r => r["HoseLogicalId"]);
  194. debugLogger.Debug("sortedSiteNozzlesDataRowsInFcDb: ");
  195. // finally we get the ordered sequence, now give them sitelevel nozzle id.
  196. for (var i = 0; i < sortedSiteNozzlesDataRowsInFcDb.Count(); i++)
  197. {
  198. var oneRow = sortedSiteNozzlesDataRowsInFcDb.ElementAt(i);
  199. oneRow["siteLevelNozzleId"] = i + 1;
  200. debugLogger.Debug(" JiHao: " + oneRow["jiHao"] + ", sideId: " + oneRow["sideId"] +
  201. ", nozzleLogicalId: " + oneRow["HoseLogicalId"] +
  202. ", siteLevelNozzleId: " + oneRow["siteLevelNozzleId"]
  203. + ", pumpId: " + oneRow["pumpid"]
  204. //+ ", hoseName: " + oneRow["hosename"]
  205. );
  206. if (nozzleRemappingPairs != null)
  207. {
  208. var pumpId = int.Parse(oneRow["pumpid"].ToString());
  209. var fusionHoseLogicalId = int.Parse(oneRow["HoseLogicalId"].ToString());
  210. var matched = nozzleRemappingPairs.FirstOrDefault(m => m.Item1 == pumpId && m.Item2 == fusionHoseLogicalId);
  211. if (matched != null)
  212. {
  213. debugLogger.Debug("Found nozzleRemappingPairs for pumpId: " + pumpId + " with HoseLogicalId: " + fusionHoseLogicalId + ", " +
  214. "will remapping the bound siteLevelNozzleId from: " + oneRow["siteLevelNozzleId"] + " to: " + matched.Item3);
  215. oneRow["siteLevelNozzleId"] = matched.Item3;
  216. }
  217. }
  218. }
  219. return sortedSiteNozzlesDataRowsInFcDb;
  220. }
  221. }
  222. }
  223. catch (Exception ex)
  224. {
  225. debugLogger.Error("LoadConfigFromFcDb exceptioned, detail: " + ex);
  226. throw;
  227. }
  228. }
  229. private IEnumerable<DataRow> LoadOrderedConfigFromPosDb()
  230. {
  231. try
  232. {
  233. using (var posConn = new SqlConnection(GenericSinochemEpsApp.AppSettings["PosDatabaseConnStr"]))
  234. {
  235. using (var sda = new SqlDataAdapter("select * from jyjpz order by jihao, abtype, qianghao", posConn))
  236. {
  237. var siteNozzlesDataTableInPosDb = new DataTable();
  238. sda.Fill(siteNozzlesDataTableInPosDb);
  239. debugLogger.Debug("siteNozzlesDataTableInPosDb: ");
  240. for (var i = 0; i < siteNozzlesDataTableInPosDb.Rows.Count; i++)
  241. {
  242. var oneRow = siteNozzlesDataTableInPosDb.Rows[i];
  243. debugLogger.Debug(" JiHao: " + oneRow["jihao"] + ", sideId(abtype): " + oneRow["abtype"]
  244. + ", nozzleLogicalId(qianghao): " + oneRow["qianghao"] +
  245. ", siteLevelNozzleId(luojiqh): " + oneRow["luojiqh"]);
  246. }
  247. return siteNozzlesDataTableInPosDb.Rows.Cast<DataRow>();
  248. }
  249. }
  250. }
  251. catch (Exception ex)
  252. {
  253. debugLogger.Error("LoadConfigFromPosDb exceptioned, detail: " + ex);
  254. throw;
  255. }
  256. }
  257. public void Dispose()
  258. {
  259. this.scannerTimer.Stop();
  260. }
  261. }
  262. }