MessageRouterConnection.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. using System;
  2. using System.Text;
  3. using System.IO;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. namespace MessageRouter
  8. {
  9. public delegate void OnMessageRouterConnectDelegate(MessageRouterConnection client, bool reconnect);
  10. public delegate void OnMessageRouterDisconnectDelegate(MessageRouterConnection client, Exception e);
  11. public delegate void OnMessageRouterPacketReceivedDelegate(MessageRouterConnection client, MessageRouterPacket p);
  12. /// <summary>
  13. /// Abstraction of Fusion Message Router Packet
  14. /// <remoarks>
  15. /// Fusion Message Router Message Format:
  16. /// <len>|<crypt format>|<message version>|<user id>|<message type>|<event type>|<destination>|<origin>|<parameters…>|^
  17. /// <len>: 5 bytes fixed length field containing the length of the message from the <message version> till the ^ included.
  18. /// <crypt format>: 1 byte fixed length field that describes the crypt format of the message. Fixed value '5'. This means no encryption.
  19. /// </remarks>
  20. /// </summary>
  21. public class MessageRouterPacket
  22. {
  23. public static int MAX_PACKET_LENGTH = 1024 * 1024;
  24. public static int LEN_LENGTH = 5;
  25. public static int CRYPT_FORMAT_LENGTH = 1;
  26. public static int PIPE_LENGTH = 1;
  27. public readonly char cryptFormat;
  28. public readonly byte[] buf;
  29. public MessageRouterPacket(char cryptFormat, byte[] buf)
  30. {
  31. this.cryptFormat = cryptFormat;
  32. this.buf = buf;
  33. }
  34. }
  35. public class MessageRouterConnection : IDisposable
  36. {
  37. /// <summary> Remote end point </summary>
  38. private IPEndPoint remoteEp;
  39. /// <summary> Port of local endpoint </summary>
  40. private int localPort;
  41. /// <summary> Connection delay </summary>
  42. private readonly int delay;
  43. /// <summary> The default delay between atttempts to connect (in ms). </summary>
  44. private const int DEFAULT_DELAY = 5000;
  45. #region Property accessor
  46. public IPEndPoint RemoteEp
  47. {
  48. get { return remoteEp; }
  49. set { remoteEp = value; }
  50. }
  51. public int LocalPort
  52. {
  53. get { return localPort; }
  54. }
  55. #endregion
  56. /// <summary>
  57. /// Constructs the ipc client with default write queue length and default delay.
  58. /// </summary>
  59. /// <param name="remoteEp">The remote endpoint to use. May be specified as
  60. /// null and set later using assignment to RemoteEp.</param>
  61. public MessageRouterConnection(IPEndPoint remoteEp)
  62. : this(remoteEp, DEFAULT_DELAY)
  63. {
  64. }
  65. /// <summary>
  66. /// Construct IPC client
  67. /// </summary>
  68. /// <param name="remoteEp">The remote endpoint to use.</param>
  69. /// <param name="delay">Delay in milliseconds between attempts to connect.
  70. /// specify 0 to not delay any more than necessary between attempts.
  71. /// </param>
  72. public MessageRouterConnection(IPEndPoint remoteEp, int delay)
  73. {
  74. if (delay < 0)
  75. throw new ArgumentException("Invalid delay");
  76. this.remoteEp = remoteEp;
  77. this.delay = delay;
  78. this.localPort = 0;
  79. }
  80. /// <summary>
  81. /// Helper to convert object state to readable string
  82. /// </summary>
  83. /// <returns>String with object properties</returns>
  84. public override String ToString()
  85. {
  86. return "MessageRouterConnection(" + remoteEp + ", " + state + ")";
  87. }
  88. #region Socket event delegates
  89. /// <summary> Delegate to inform when a connection is established. </summary>
  90. public OnMessageRouterConnectDelegate OnConnect;
  91. /// <summary> Delegate to inform when a connection is closed. </summary>
  92. public OnMessageRouterDisconnectDelegate OnDisconnect;
  93. /// <summary> Delegate to inform when a packet is received. </summary>
  94. public OnMessageRouterPacketReceivedDelegate OnPacketReceived;
  95. #endregion
  96. #region Packet I/O
  97. /// <summary>
  98. /// Read packet from memory stream
  99. /// </summary>
  100. /// <param name="s">Memory stream</param>
  101. /// <returns>Packet from memory stream</returns>
  102. public MessageRouterPacket ReadPacket(Stream s)
  103. {
  104. // Read message length, 5 bytes
  105. int len = ReadLen(s);
  106. if (len == 0)
  107. throw new IOException("Message router packet len = 0");
  108. if (len > MessageRouterPacket.MAX_PACKET_LENGTH)
  109. throw new IOException("Message router packet len > Max packet size");
  110. // Read Pipe char
  111. if (!ReadPipe(s))
  112. throw new IOException("Malformed router message");
  113. // Read crypto format
  114. char cryptFormat = ReadCryptFormat(s);
  115. // Read Pipe char
  116. if (!ReadPipe(s))
  117. throw new IOException("Malformed router message");
  118. byte[] buf = new byte[len];
  119. ReadBytes(s, buf);
  120. return new MessageRouterPacket(cryptFormat, buf);
  121. }
  122. private int ReadLen(Stream s)
  123. {
  124. byte[] buf = new byte[MessageRouterPacket.LEN_LENGTH];
  125. ReadBytes(s, buf);
  126. Encoding encoding = new ASCIIEncoding();
  127. //Console.WriteLine("READ LEN: " + encoding.GetString(buf));
  128. String str = encoding.GetString(buf).TrimStart('0');
  129. return Convert.ToInt32(str);
  130. }
  131. private char ReadCryptFormat(Stream s)
  132. {
  133. byte[] buf = new byte[MessageRouterPacket.CRYPT_FORMAT_LENGTH];
  134. ReadBytes(s, buf);
  135. char cf = (char)buf[0];
  136. //Console.WriteLine("READ CRYPT: " + cf);
  137. return cf;
  138. }
  139. private bool ReadPipe(Stream s)
  140. {
  141. byte[] buf = new byte[MessageRouterPacket.PIPE_LENGTH];
  142. ReadBytes(s, buf);
  143. if (buf[0] != 0x7C)
  144. return false;
  145. return true;
  146. }
  147. /// <summary>
  148. /// Read byte array from socket stream
  149. /// </summary>
  150. /// <param name="s">Socket data stream</param>
  151. /// <param name="buf">Data buffer</param>
  152. private void ReadBytes(Stream s, byte[] buf)
  153. {
  154. if (s == null)
  155. throw new IOException("Socket closed");
  156. int n = buf.Length;
  157. int i = 0;
  158. while (i < n)
  159. {
  160. int k = s.Read(buf, i, n - i);
  161. if (k == 0)
  162. throw new IOException("Socket eof");
  163. i += k;
  164. }
  165. }
  166. private void WriteLen(Stream s, int len)
  167. {
  168. byte[] buf = new byte[MessageRouterPacket.LEN_LENGTH];
  169. String str = len.ToString();
  170. int padding = MessageRouterPacket.LEN_LENGTH - str.Length;
  171. for (int i = 0; i < padding; i++)
  172. str = "0" + str;
  173. System.Text.ASCIIEncoding encoding = new System.Text.ASCIIEncoding();
  174. WriteBytes(s, encoding.GetBytes(str));
  175. }
  176. private void WriteCryptFormat(Stream s, char cryptFormat)
  177. {
  178. byte[] buf = new byte[MessageRouterPacket.CRYPT_FORMAT_LENGTH];
  179. buf[0] = (byte)cryptFormat;
  180. WriteBytes(s, buf);
  181. }
  182. private void WritePipe(Stream s)
  183. {
  184. byte[] pipe = new Byte[MessageRouterPacket.PIPE_LENGTH];
  185. pipe[0] = 0x7C;
  186. WriteBytes(s, pipe);
  187. }
  188. // Write byte array into socket stream
  189. private void WriteBytes(Stream s, byte[] buf)
  190. {
  191. if (s == null)
  192. throw new IOException("Socket closed");
  193. s.Write(buf, 0, buf.Length);
  194. }
  195. /// <summary>
  196. /// Write packet into socket stream
  197. /// </summary>
  198. /// <param name="s">Data stream</param>
  199. /// <param name="p">Data packet</param>
  200. private void WritePacket(Stream s, MessageRouterPacket p)
  201. {
  202. WriteLen(s, p.buf == null ? 0 : p.buf.Length);
  203. WritePipe(s);
  204. WriteCryptFormat(s, p.cryptFormat);
  205. WritePipe(s);
  206. WriteBytes(s, p.buf);
  207. s.Flush();
  208. }
  209. #endregion
  210. #region Socket state management
  211. enum State { CLOSED, OPENING, OPENED };
  212. /// <summary> Socket state </summary>
  213. private State _state;
  214. /// <summary> Socket state accessor </summary>
  215. private State state
  216. {
  217. get
  218. {
  219. return _state;
  220. }
  221. set
  222. {
  223. lock (this)
  224. {
  225. _state = value;
  226. Monitor.PulseAll(this);
  227. }
  228. }
  229. }
  230. #endregion
  231. #region Connection Management
  232. /// <summary> TCP client object for this IPC client </summary>
  233. private TcpClient tcpClient;
  234. /// <summary> Input stream for this TCP client </summary>
  235. private Stream inputStream;
  236. /// <summary> Output stream for this TCP client </summary>
  237. private Stream outputStream;
  238. /// <summary> Flag to indicate TCP connection is reconnected </summary>
  239. private bool reconnect;
  240. /// <summary>
  241. /// Validate socket state before opening
  242. /// </summary>
  243. private void PrepareToOpen()
  244. {
  245. if (state != State.CLOSED)
  246. throw new InvalidOperationException("Socket already open");
  247. if (remoteEp == null)
  248. throw new InvalidOperationException("Invalid remote end point");
  249. if (OnPacketReceived == null)
  250. throw new InvalidOperationException("Missing deletegate for packet received event");
  251. state = State.OPENING;
  252. }
  253. /// <summary>
  254. /// Open a TCP connection to IPC server
  255. /// </summary>
  256. private void OpenConnection()
  257. {
  258. lock (this)
  259. {
  260. if (state != State.OPENING)
  261. throw new Exception("Socket state not opening - " + state);
  262. tcpClient = new TcpClient();
  263. try
  264. {
  265. tcpClient.Connect(remoteEp); // connect to IPC server
  266. tcpClient.NoDelay = true; // disable nagle's algorithm
  267. NetworkStream networkStream = tcpClient.GetStream();
  268. inputStream = new BufferedStream(networkStream);
  269. outputStream = new BufferedStream(networkStream);
  270. state = State.OPENED;
  271. IPEndPoint localEndPoint = tcpClient.Client.LocalEndPoint as IPEndPoint;
  272. localPort = localEndPoint.Port;
  273. if (OnConnect != null)
  274. OnConnect(this, reconnect); // notify connection is established
  275. reconnect = true; // show that we've connected at least once
  276. }
  277. catch (Exception e)
  278. {
  279. CloseConnection(e);
  280. throw e;
  281. }
  282. }
  283. }
  284. /// <summary>
  285. /// Read data from input stream
  286. /// </summary>
  287. private void ReadConnection()
  288. {
  289. Stream xis = inputStream;
  290. if (xis == null)
  291. return;
  292. while (state == State.OPENED)
  293. {
  294. MessageRouterPacket p = ReadPacket(xis);
  295. if (OnPacketReceived != null)
  296. OnPacketReceived(this, p);
  297. }
  298. }
  299. /// <summary>
  300. /// Close socket connection to IPC server
  301. /// </summary>
  302. /// <param name="e">Exception info</param>
  303. private void CloseConnection(Exception e)
  304. {
  305. lock (this)
  306. {
  307. if (tcpClient != null)
  308. {
  309. if (inputStream != null)
  310. {
  311. try
  312. {
  313. inputStream.Close();
  314. }
  315. catch { }
  316. inputStream = null;
  317. }
  318. if (outputStream != null)
  319. {
  320. try
  321. {
  322. outputStream.Close();
  323. }
  324. catch { }
  325. outputStream = null;
  326. }
  327. try
  328. {
  329. tcpClient.Close();
  330. }
  331. catch { }
  332. tcpClient = null;
  333. localPort = 0;
  334. if (state == State.OPENED)
  335. state = State.OPENING; // force reconnect
  336. if (OnDisconnect != null)
  337. OnDisconnect(this, e);
  338. }
  339. }
  340. }
  341. /// <summary>
  342. /// Permanently disposes of this object.
  343. /// </summary>
  344. public void Dispose()
  345. {
  346. Close(1); // close and don't wait!
  347. }
  348. #endregion
  349. #region Asynchronous Session Management
  350. /// <summary>
  351. /// Start socket connection thread
  352. /// </summary>
  353. public void Start()
  354. {
  355. lock (this)
  356. {
  357. PrepareToOpen();
  358. thread = StartThread(new ThreadStart(Run), "Run");
  359. }
  360. }
  361. /// <summary>
  362. /// Worker thead for connection management
  363. /// </summary>
  364. private void Run()
  365. {
  366. while (state != State.CLOSED)
  367. {
  368. OpenConnectionX();
  369. try
  370. {
  371. ReadConnection();
  372. CloseConnection(null);
  373. }
  374. catch (ThreadAbortException)
  375. { // nothing
  376. }
  377. catch (Exception e)
  378. {
  379. CloseConnection(e);
  380. }
  381. }
  382. }
  383. /// <summary>
  384. /// Open a TCP connection to IPC server until successful
  385. /// </summary>
  386. private void OpenConnectionX()
  387. {
  388. while (state == State.OPENING)
  389. {
  390. lock (this)
  391. {
  392. try
  393. {
  394. OpenConnection();
  395. return;
  396. }
  397. catch (SocketException)
  398. {
  399. if (state == State.OPENING)
  400. Monitor.Wait(this, delay);
  401. }
  402. }
  403. }
  404. }
  405. /// <summary>
  406. /// Close a socket connection without wait
  407. /// </summary>
  408. public void Close()
  409. {
  410. Close(0);
  411. }
  412. /// <summary>
  413. /// Close a socket connection with timeout
  414. /// </summary>
  415. /// <param name="timeout">Time to wait for socket to close</param>
  416. public void Close(int timeout)
  417. {
  418. if (timeout < 0)
  419. throw new ArgumentException("Invalid timeout value");
  420. lock (this)
  421. {
  422. if (state != State.CLOSED)
  423. {
  424. state = State.CLOSED;
  425. CloseConnection(new Exception("Socket closed"));
  426. }
  427. }
  428. if (thread != Thread.CurrentThread)
  429. {
  430. StopThread(thread, timeout);
  431. thread = null;
  432. }
  433. }
  434. #endregion
  435. #region Synchronous Session Management
  436. /// <summary>
  437. /// Open socket connection to IPC server synchronously
  438. /// </summary>
  439. /// <returns>True if connection established</returns>
  440. public bool Open()
  441. {
  442. lock (this)
  443. {
  444. try
  445. {
  446. PrepareToOpen();
  447. OpenConnection();
  448. thread = StartThread(new ThreadStart(SyncReader), "SyncReader");
  449. return true;
  450. }
  451. catch (SocketException e)
  452. {
  453. state = State.CLOSED;
  454. CloseConnection(e);
  455. return false;
  456. }
  457. }
  458. }
  459. /// <summary>
  460. /// Worker thread logic to read data from input stream
  461. /// </summary>
  462. private void SyncReader()
  463. {
  464. try
  465. {
  466. ReadConnection();
  467. FinishSyncReader(null);
  468. }
  469. catch (Exception e)
  470. {
  471. FinishSyncReader(e);
  472. }
  473. }
  474. /// <summary>
  475. /// Close socket connection to IPC server
  476. /// </summary>
  477. /// <param name="e"></param>
  478. private void FinishSyncReader(Exception e)
  479. {
  480. lock (this)
  481. {
  482. if (state != State.CLOSED)
  483. {
  484. state = State.CLOSED;
  485. CloseConnection(e);
  486. thread = null;
  487. }
  488. }
  489. }
  490. #endregion
  491. #region Thread Management
  492. /// <summary> The worker thead to create </summary>
  493. private Thread thread;
  494. /// <summary>
  495. /// Start worker thread
  496. /// </summary>
  497. /// <param name="start">Main function for this worker thread</param>
  498. /// <param name="what">Thread description</param>
  499. /// <returns>The created thread</returns>
  500. private Thread StartThread(ThreadStart start, string what)
  501. {
  502. Thread t = new Thread(start);
  503. t.IsBackground = true;
  504. t.Name = ToString() + " " + what;
  505. t.Start();
  506. return t;
  507. }
  508. /// <summary>
  509. /// Stop the worker thread
  510. /// </summary>
  511. /// <param name="t">Thread object to stop</param>
  512. /// <param name="timeout">Time to wait</param>
  513. private void StopThread(Thread t, int timeout)
  514. {
  515. if (t == null)
  516. return;
  517. if (!t.Join(timeout))
  518. t.Abort();
  519. }
  520. #endregion
  521. #region Outgoing Data
  522. public bool Write(char cryptFormat, byte[] data)
  523. {
  524. try
  525. {
  526. MessageRouterPacket p = new MessageRouterPacket(cryptFormat, data);
  527. lock (writeSync)
  528. {
  529. WritePacket(outputStream, p);
  530. }
  531. }
  532. catch (Exception e)
  533. {
  534. CloseConnection(e);
  535. return false;
  536. }
  537. return true;
  538. }
  539. private Object writeSync = new Object();
  540. #endregion
  541. }
  542. }