using SinochemInternetPlusApp; using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Data.Odbc; using System.Data.SqlClient; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Timers; using Wayne.Lib; using Wayne.Lib.Log; namespace SinoChemFC2PosProxy { internal class SiteConfigScannerCommunicator : ICommunicator, IDisposable { static NLog.Logger debugLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("PumpHandler"); /// /// 0 for not started, 1 for started already. /// private int isStarted = 0; //private readonly DebugLogger debugLogger = // new DebugLogger(new IdentifiableEntity(0, "FC2PosProxyMain", "", null)); /// /// PumpIdAsSideA:PumpIdAsSideB:JiHao /// private readonly IEnumerable> pumpSidePairs = null; /// /// pumpId:RealHoseLogicalId:HuiTianSiteLevelHoseId /// public static IEnumerable> nozzleRemappingPairs = null; private System.Timers.Timer scannerTimer; /// /// Start a site configuration scanner and try sync the FC db with POS's. /// call start() for this communicator will force a sync right now. /// /// public SiteConfigScannerCommunicator(int scanningInterval) { var pumpSideMappingRawString = App.AppSettings["PumpSideMapping"].Trim().Replace(" ", ""); if (pumpSideMappingRawString.Substring(pumpSideMappingRawString.Length - 1) == ";") pumpSideMappingRawString = pumpSideMappingRawString.Substring(0, pumpSideMappingRawString.Length - 1); this.pumpSidePairs = pumpSideMappingRawString.Split(';').Select(p => new Tuple(int.Parse(p.Split(':')[0]), int.Parse(p.Split(':')[1]), int.Parse(p.Split(':')[2]))); if (App.AppSettings["forceMappingFusionHoseToHuiTianHose"] != null) { var nozzleRemappingRawString = App.AppSettings["forceMappingFusionHoseToHuiTianHose"].Trim().Replace(" ", ""); if (nozzleRemappingRawString.Substring(nozzleRemappingRawString.Length - 1) == ";") nozzleRemappingRawString = nozzleRemappingRawString.Substring(0, nozzleRemappingRawString.Length - 1); debugLogger.Info("will parse forceMappingFusionHoseToHuiTianHose, raw string in config is: " + nozzleRemappingRawString, DebugLogLevel.Maximized); nozzleRemappingPairs = nozzleRemappingRawString.Split(';').Select(p => new Tuple(int.Parse(p.Split(':')[0]), int.Parse(p.Split(':')[1]), int.Parse(p.Split(':')[2]))); } this.scannerTimer = new System.Timers.Timer(scanningInterval); this.scannerTimer.Elapsed += scannerTimer_Elapsed; } void scannerTimer_Elapsed(object sender, ElapsedEventArgs e) { this.PerformCompareAndUpdate(); } /// /// read config from FC database, and check if need update to POS database. /// /// true for updated, false for not updated. private bool PerformCompareAndUpdate() { try { var configInFcDb = this.LoadOrderedConfigFromFcDb(); var configInPosDb = this.LoadOrderedConfigFromPosDb(); if (configInFcDb == null || !configInFcDb.Any()) { debugLogger.Info("config In FC Database is empty, skip this round"); return false; } SiteConfigUtility.Default.UpdateLatestSiteConfig(configInFcDb); // for now, always overwrite the whole table in POS database without further check for rows or columns level. using (var posConn = new SqlConnection(App.AppSettings["PosDatabaseConnStr"])) { /* configInFcDb structure like below */ //" JiHao: " + oneRow["jiHao"] + ", sideId: " + oneRow["sideId"] + //", nozzleLogicalId: " + oneRow["HoseLogicalId"] + //", siteLevelNozzleId: " + oneRow["siteLevelNozzleId"] string bulkInsertCmd = configInFcDb.Select( r => string.Format("Select {0}, '{1}', {2}, {3}", r["jiHao"], r["sideId"], r["HoseLogicalId"], r["siteLevelNozzleId"])).Aggregate((acc, n) => acc + " Union all " + n); var truncateAndInsertCmd = new SqlCommand( "TRUNCATE table jyjpz; INSERT jyjpz (jihao, abtype, qianghao, luojiqh) " + bulkInsertCmd, posConn); debugLogger.Info("truncateAndInsertSiteConfigCmd: " + truncateAndInsertCmd.CommandText); posConn.Open(); truncateAndInsertCmd.ExecuteNonQuery(); return true; } } catch (Exception ex) { debugLogger.Error("PerformCompareAndUpdate exceptioned, detail: " + ex); return false; } } public bool IsStarted { get { return this.isStarted == 1; } } public bool Start() { if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0)) { // the initial load must be succeed! if (!this.PerformCompareAndUpdate()) { return false; } this.scannerTimer.Start(); return true; } else { throw new InvalidOperationException("Already started."); } } private IEnumerable LoadOrderedConfigFromFcDb() { try { using (var fcOdbcConnection = new SqlConnection(App.AppSettings["FCDatabaseConnStr"])) { using (var oda = new SqlDataAdapter( "select id as PumpId, device_id as HoseName, param_value as HoseLogicalId " + " from config_values where library = 'pump' and device_id like 'hose%' and parameter ='logical_id' " + " order by id::int, device_id", fcOdbcConnection)) { var siteNozzlesDataTableInFcDb = new DataTable(); oda.Fill(siteNozzlesDataTableInFcDb); //MemoryStream ms = new MemoryStream(); //siteNozzlesDataTableInFcDb.WriteXml(ms); //StreamReader sr = new StreamReader(ms); //var rawData = sr.ReadToEnd(); //debugLogger.Add("LoadConfigFromFcDb original data: \r\n" + rawData, DebugLogLevel.Normal); /* the returned data from postgresql db is like: PumpId HoseName HoseLogicalId "1" "hose3" "3" "1" "hose4" "4" "1" "hose2" "2" "1" "hose1" "1" "10" "hose2" "2" "10" "hose1" "1" "10" "hose3" "3" "10" "hose4" "4" */ // PetroChina defined a id of: 'jiHao' which stands for a physical pump(2 sides). siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("jiHao")); // PetroChina need another id of site overall id for a nozzle, start from 1. // we cacualte this based on jiHao and HostLogicalId siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("siteLevelNozzleId")); // as doc defined, side A is 0, side B is 1. siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("sideId")); var siteNozzlesDataRowsInFcDb = siteNozzlesDataTableInFcDb.Rows.Cast(); foreach (var row in siteNozzlesDataRowsInFcDb) { var pumpId = int.Parse(row["pumpid"].ToString()); var possibleSideA = this.pumpSidePairs.FirstOrDefault(p => p.Item1 == pumpId); Tuple possibleSideB = null; if (possibleSideA != null) { row["sideId"] = "0"; row["jiHao"] = possibleSideA.Item3; } else if ((possibleSideB = this.pumpSidePairs.FirstOrDefault(p => p.Item2 == pumpId)) != null) { row["sideId"] = "1"; row["jiHao"] = possibleSideB.Item3; } else { throw new ArgumentException("Pump with pumpId: " + pumpId + " is neither side A nor side B, pls check Side definition config file."); } } var sortedSiteNozzlesDataRowsInFcDb = siteNozzlesDataRowsInFcDb.OrderBy(r => int.Parse(r["jiHao"].ToString())) .ThenBy(r => r["sideId"]) .ThenBy(r => r["HoseLogicalId"]); debugLogger.Debug("sortedSiteNozzlesDataRowsInFcDb: "); // finally we get the ordered sequence, now give them sitelevel nozzle id. for (var i = 0; i < sortedSiteNozzlesDataRowsInFcDb.Count(); i++) { var oneRow = sortedSiteNozzlesDataRowsInFcDb.ElementAt(i); oneRow["siteLevelNozzleId"] = i + 1; debugLogger.Debug(" JiHao: " + oneRow["jiHao"] + ", sideId: " + oneRow["sideId"] + ", nozzleLogicalId: " + oneRow["HoseLogicalId"] + ", siteLevelNozzleId: " + oneRow["siteLevelNozzleId"] + ", pumpId: " + oneRow["pumpid"] //+ ", hoseName: " + oneRow["hosename"] ); if (nozzleRemappingPairs != null) { var pumpId = int.Parse(oneRow["pumpid"].ToString()); var fusionHoseLogicalId = int.Parse(oneRow["HoseLogicalId"].ToString()); var matched = nozzleRemappingPairs.FirstOrDefault(m => m.Item1 == pumpId && m.Item2 == fusionHoseLogicalId); if (matched != null) { debugLogger.Debug("Found nozzleRemappingPairs for pumpId: " + pumpId + " with HoseLogicalId: " + fusionHoseLogicalId + ", " + "will remapping the bound siteLevelNozzleId from: " + oneRow["siteLevelNozzleId"] + " to: " + matched.Item3); oneRow["siteLevelNozzleId"] = matched.Item3; } } } return sortedSiteNozzlesDataRowsInFcDb; } } } catch (Exception ex) { debugLogger.Error("LoadConfigFromFcDb exceptioned, detail: " + ex); throw; } } private IEnumerable LoadOrderedConfigFromPosDb() { try { using (var posConn = new SqlConnection(App.AppSettings["PosDatabaseConnStr"])) { using (var sda = new SqlDataAdapter("select * from jyjpz order by jihao, abtype, qianghao", posConn)) { var siteNozzlesDataTableInPosDb = new DataTable(); sda.Fill(siteNozzlesDataTableInPosDb); debugLogger.Debug("siteNozzlesDataTableInPosDb: "); for (var i = 0; i < siteNozzlesDataTableInPosDb.Rows.Count; i++) { var oneRow = siteNozzlesDataTableInPosDb.Rows[i]; debugLogger.Debug(" JiHao: " + oneRow["jihao"] + ", sideId(abtype): " + oneRow["abtype"] + ", nozzleLogicalId(qianghao): " + oneRow["qianghao"] + ", siteLevelNozzleId(luojiqh): " + oneRow["luojiqh"]); } return siteNozzlesDataTableInPosDb.Rows.Cast(); } } } catch (Exception ex) { debugLogger.Error("LoadConfigFromPosDb exceptioned, detail: " + ex); throw; } } public void Dispose() { this.scannerTimer.Stop(); } } }