using SinochemInternetPlusApp;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Data.Odbc;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Timers;
using Wayne.Lib;
using Wayne.Lib.Log;
namespace SinoChemFC2PosProxy
{
internal class SiteConfigScannerCommunicator : ICommunicator, IDisposable
{
static NLog.Logger debugLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("PumpHandler");
///
/// 0 for not started, 1 for started already.
///
private int isStarted = 0;
//private readonly DebugLogger debugLogger =
// new DebugLogger(new IdentifiableEntity(0, "FC2PosProxyMain", "", null));
///
/// PumpIdAsSideA:PumpIdAsSideB:JiHao
///
private readonly IEnumerable> pumpSidePairs = null;
///
/// pumpId:RealHoseLogicalId:HuiTianSiteLevelHoseId
///
public static IEnumerable> nozzleRemappingPairs = null;
private System.Timers.Timer scannerTimer;
///
/// Start a site configuration scanner and try sync the FC db with POS's.
/// call start() for this communicator will force a sync right now.
///
///
public SiteConfigScannerCommunicator(int scanningInterval)
{
var pumpSideMappingRawString = App.AppSettings["PumpSideMapping"].Trim().Replace(" ", "");
if (pumpSideMappingRawString.Substring(pumpSideMappingRawString.Length - 1) == ";")
pumpSideMappingRawString = pumpSideMappingRawString.Substring(0, pumpSideMappingRawString.Length - 1);
this.pumpSidePairs = pumpSideMappingRawString.Split(';').Select(p =>
new Tuple(int.Parse(p.Split(':')[0]), int.Parse(p.Split(':')[1]),
int.Parse(p.Split(':')[2])));
if (App.AppSettings["forceMappingFusionHoseToHuiTianHose"] != null)
{
var nozzleRemappingRawString = App.AppSettings["forceMappingFusionHoseToHuiTianHose"].Trim().Replace(" ", "");
if (nozzleRemappingRawString.Substring(nozzleRemappingRawString.Length - 1) == ";")
nozzleRemappingRawString = nozzleRemappingRawString.Substring(0, nozzleRemappingRawString.Length - 1);
debugLogger.Info("will parse forceMappingFusionHoseToHuiTianHose, raw string in config is: " + nozzleRemappingRawString, DebugLogLevel.Maximized);
nozzleRemappingPairs = nozzleRemappingRawString.Split(';').Select(p =>
new Tuple(int.Parse(p.Split(':')[0]), int.Parse(p.Split(':')[1]),
int.Parse(p.Split(':')[2])));
}
this.scannerTimer = new System.Timers.Timer(scanningInterval);
this.scannerTimer.Elapsed += scannerTimer_Elapsed;
}
void scannerTimer_Elapsed(object sender, ElapsedEventArgs e)
{
this.PerformCompareAndUpdate();
}
///
/// read config from FC database, and check if need update to POS database.
///
/// true for updated, false for not updated.
private bool PerformCompareAndUpdate()
{
try
{
var configInFcDb = this.LoadOrderedConfigFromFcDb();
var configInPosDb = this.LoadOrderedConfigFromPosDb();
if (configInFcDb == null || !configInFcDb.Any())
{
debugLogger.Info("config In FC Database is empty, skip this round");
return false;
}
SiteConfigUtility.Default.UpdateLatestSiteConfig(configInFcDb);
// for now, always overwrite the whole table in POS database without further check for rows or columns level.
using (var posConn =
new SqlConnection(App.AppSettings["PosDatabaseConnStr"]))
{
/* configInFcDb structure like below */
//" JiHao: " + oneRow["jiHao"] + ", sideId: " + oneRow["sideId"] +
//", nozzleLogicalId: " + oneRow["HoseLogicalId"] +
//", siteLevelNozzleId: " + oneRow["siteLevelNozzleId"]
string bulkInsertCmd = configInFcDb.Select(
r => string.Format("Select {0}, '{1}', {2}, {3}",
r["jiHao"],
r["sideId"],
r["HoseLogicalId"],
r["siteLevelNozzleId"])).Aggregate((acc, n) => acc + " Union all " + n);
var truncateAndInsertCmd =
new SqlCommand(
"TRUNCATE table jyjpz; INSERT jyjpz (jihao, abtype, qianghao, luojiqh) " + bulkInsertCmd,
posConn);
debugLogger.Info("truncateAndInsertSiteConfigCmd: " + truncateAndInsertCmd.CommandText);
posConn.Open();
truncateAndInsertCmd.ExecuteNonQuery();
return true;
}
}
catch (Exception ex)
{
debugLogger.Error("PerformCompareAndUpdate exceptioned, detail: " + ex);
return false;
}
}
public bool IsStarted
{
get { return this.isStarted == 1; }
}
public bool Start()
{
if (0 == Interlocked.CompareExchange(ref this.isStarted, 1, 0))
{
// the initial load must be succeed!
if (!this.PerformCompareAndUpdate())
{
return false;
}
this.scannerTimer.Start();
return true;
}
else
{
throw new InvalidOperationException("Already started.");
}
}
private IEnumerable LoadOrderedConfigFromFcDb()
{
try
{
using (var fcOdbcConnection = new SqlConnection(App.AppSettings["FCDatabaseConnStr"]))
{
using (var oda = new SqlDataAdapter(
"select id as PumpId, device_id as HoseName, param_value as HoseLogicalId " +
" from config_values where library = 'pump' and device_id like 'hose%' and parameter ='logical_id' " +
" order by id::int, device_id",
fcOdbcConnection))
{
var siteNozzlesDataTableInFcDb = new DataTable();
oda.Fill(siteNozzlesDataTableInFcDb);
//MemoryStream ms = new MemoryStream();
//siteNozzlesDataTableInFcDb.WriteXml(ms);
//StreamReader sr = new StreamReader(ms);
//var rawData = sr.ReadToEnd();
//debugLogger.Add("LoadConfigFromFcDb original data: \r\n" + rawData, DebugLogLevel.Normal);
/* the returned data from postgresql db is like:
PumpId HoseName HoseLogicalId
"1" "hose3" "3"
"1" "hose4" "4"
"1" "hose2" "2"
"1" "hose1" "1"
"10" "hose2" "2"
"10" "hose1" "1"
"10" "hose3" "3"
"10" "hose4" "4"
*/
// PetroChina defined a id of: 'jiHao' which stands for a physical pump(2 sides).
siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("jiHao"));
// PetroChina need another id of site overall id for a nozzle, start from 1.
// we cacualte this based on jiHao and HostLogicalId
siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("siteLevelNozzleId"));
// as doc defined, side A is 0, side B is 1.
siteNozzlesDataTableInFcDb.Columns.Add(new DataColumn("sideId"));
var siteNozzlesDataRowsInFcDb = siteNozzlesDataTableInFcDb.Rows.Cast();
foreach (var row in siteNozzlesDataRowsInFcDb)
{
var pumpId = int.Parse(row["pumpid"].ToString());
var possibleSideA = this.pumpSidePairs.FirstOrDefault(p => p.Item1 == pumpId);
Tuple possibleSideB = null;
if (possibleSideA != null)
{
row["sideId"] = "0";
row["jiHao"] = possibleSideA.Item3;
}
else if ((possibleSideB = this.pumpSidePairs.FirstOrDefault(p => p.Item2 == pumpId)) != null)
{
row["sideId"] = "1";
row["jiHao"] = possibleSideB.Item3;
}
else
{
throw new ArgumentException("Pump with pumpId: " + pumpId +
" is neither side A nor side B, pls check Side definition config file.");
}
}
var sortedSiteNozzlesDataRowsInFcDb =
siteNozzlesDataRowsInFcDb.OrderBy(r => int.Parse(r["jiHao"].ToString()))
.ThenBy(r => r["sideId"])
.ThenBy(r => r["HoseLogicalId"]);
debugLogger.Debug("sortedSiteNozzlesDataRowsInFcDb: ");
// finally we get the ordered sequence, now give them sitelevel nozzle id.
for (var i = 0; i < sortedSiteNozzlesDataRowsInFcDb.Count(); i++)
{
var oneRow = sortedSiteNozzlesDataRowsInFcDb.ElementAt(i);
oneRow["siteLevelNozzleId"] = i + 1;
debugLogger.Debug(" JiHao: " + oneRow["jiHao"] + ", sideId: " + oneRow["sideId"] +
", nozzleLogicalId: " + oneRow["HoseLogicalId"] +
", siteLevelNozzleId: " + oneRow["siteLevelNozzleId"]
+ ", pumpId: " + oneRow["pumpid"]
//+ ", hoseName: " + oneRow["hosename"]
);
if (nozzleRemappingPairs != null)
{
var pumpId = int.Parse(oneRow["pumpid"].ToString());
var fusionHoseLogicalId = int.Parse(oneRow["HoseLogicalId"].ToString());
var matched = nozzleRemappingPairs.FirstOrDefault(m => m.Item1 == pumpId && m.Item2 == fusionHoseLogicalId);
if (matched != null)
{
debugLogger.Debug("Found nozzleRemappingPairs for pumpId: " + pumpId + " with HoseLogicalId: " + fusionHoseLogicalId + ", " +
"will remapping the bound siteLevelNozzleId from: " + oneRow["siteLevelNozzleId"] + " to: " + matched.Item3);
oneRow["siteLevelNozzleId"] = matched.Item3;
}
}
}
return sortedSiteNozzlesDataRowsInFcDb;
}
}
}
catch (Exception ex)
{
debugLogger.Error("LoadConfigFromFcDb exceptioned, detail: " + ex);
throw;
}
}
private IEnumerable LoadOrderedConfigFromPosDb()
{
try
{
using (var posConn = new SqlConnection(App.AppSettings["PosDatabaseConnStr"]))
{
using (var sda = new SqlDataAdapter("select * from jyjpz order by jihao, abtype, qianghao", posConn))
{
var siteNozzlesDataTableInPosDb = new DataTable();
sda.Fill(siteNozzlesDataTableInPosDb);
debugLogger.Debug("siteNozzlesDataTableInPosDb: ");
for (var i = 0; i < siteNozzlesDataTableInPosDb.Rows.Count; i++)
{
var oneRow = siteNozzlesDataTableInPosDb.Rows[i];
debugLogger.Debug(" JiHao: " + oneRow["jihao"] + ", sideId(abtype): " + oneRow["abtype"]
+ ", nozzleLogicalId(qianghao): " + oneRow["qianghao"] +
", siteLevelNozzleId(luojiqh): " + oneRow["luojiqh"]);
}
return siteNozzlesDataTableInPosDb.Rows.Cast();
}
}
}
catch (Exception ex)
{
debugLogger.Error("LoadConfigFromPosDb exceptioned, detail: " + ex);
throw;
}
}
public void Dispose()
{
this.scannerTimer.Stop();
}
}
}