using System; using System.Text; using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; namespace MessageRouter { public delegate void OnMessageRouterConnectDelegate(MessageRouterConnection client, bool reconnect); public delegate void OnMessageRouterDisconnectDelegate(MessageRouterConnection client, Exception e); public delegate void OnMessageRouterPacketReceivedDelegate(MessageRouterConnection client, MessageRouterPacket p); /// /// Abstraction of Fusion Message Router Packet /// /// Fusion Message Router Message Format: /// |||||||||^ /// : 5 bytes fixed length field containing the length of the message from the till the ^ included. /// : 1 byte fixed length field that describes the crypt format of the message. Fixed value '5'. This means no encryption. /// /// public class MessageRouterPacket { public static int MAX_PACKET_LENGTH = 1024 * 1024; public static int LEN_LENGTH = 5; public static int CRYPT_FORMAT_LENGTH = 1; public static int PIPE_LENGTH = 1; public readonly char cryptFormat; public readonly byte[] buf; public MessageRouterPacket(char cryptFormat, byte[] buf) { this.cryptFormat = cryptFormat; this.buf = buf; } } public class MessageRouterConnection : IDisposable { /// Remote end point private IPEndPoint remoteEp; /// Port of local endpoint private int localPort; /// Connection delay private readonly int delay; /// The default delay between atttempts to connect (in ms). private const int DEFAULT_DELAY = 5000; #region Property accessor public IPEndPoint RemoteEp { get { return remoteEp; } set { remoteEp = value; } } public int LocalPort { get { return localPort; } } #endregion /// /// Constructs the ipc client with default write queue length and default delay. /// /// The remote endpoint to use. May be specified as /// null and set later using assignment to RemoteEp. public MessageRouterConnection(IPEndPoint remoteEp) : this(remoteEp, DEFAULT_DELAY) { } /// /// Construct IPC client /// /// The remote endpoint to use. /// Delay in milliseconds between attempts to connect. /// specify 0 to not delay any more than necessary between attempts. /// public MessageRouterConnection(IPEndPoint remoteEp, int delay) { if (delay < 0) throw new ArgumentException("Invalid delay"); this.remoteEp = remoteEp; this.delay = delay; this.localPort = 0; } /// /// Helper to convert object state to readable string /// /// String with object properties public override String ToString() { return "MessageRouterConnection(" + remoteEp + ", " + state + ")"; } #region Socket event delegates /// Delegate to inform when a connection is established. public OnMessageRouterConnectDelegate OnConnect; /// Delegate to inform when a connection is closed. public OnMessageRouterDisconnectDelegate OnDisconnect; /// Delegate to inform when a packet is received. public OnMessageRouterPacketReceivedDelegate OnPacketReceived; #endregion #region Packet I/O /// /// Read packet from memory stream /// /// Memory stream /// Packet from memory stream public MessageRouterPacket ReadPacket(Stream s) { // Read message length, 5 bytes int len = ReadLen(s); if (len == 0) throw new IOException("Message router packet len = 0"); if (len > MessageRouterPacket.MAX_PACKET_LENGTH) throw new IOException("Message router packet len > Max packet size"); // Read Pipe char if (!ReadPipe(s)) throw new IOException("Malformed router message"); // Read crypto format char cryptFormat = ReadCryptFormat(s); // Read Pipe char if (!ReadPipe(s)) throw new IOException("Malformed router message"); byte[] buf = new byte[len]; ReadBytes(s, buf); return new MessageRouterPacket(cryptFormat, buf); } private int ReadLen(Stream s) { byte[] buf = new byte[MessageRouterPacket.LEN_LENGTH]; ReadBytes(s, buf); Encoding encoding = new ASCIIEncoding(); //Console.WriteLine("READ LEN: " + encoding.GetString(buf)); String str = encoding.GetString(buf).TrimStart('0'); return Convert.ToInt32(str); } private char ReadCryptFormat(Stream s) { byte[] buf = new byte[MessageRouterPacket.CRYPT_FORMAT_LENGTH]; ReadBytes(s, buf); char cf = (char)buf[0]; //Console.WriteLine("READ CRYPT: " + cf); return cf; } private bool ReadPipe(Stream s) { byte[] buf = new byte[MessageRouterPacket.PIPE_LENGTH]; ReadBytes(s, buf); if (buf[0] != 0x7C) return false; return true; } /// /// Read byte array from socket stream /// /// Socket data stream /// Data buffer private void ReadBytes(Stream s, byte[] buf) { if (s == null) throw new IOException("Socket closed"); int n = buf.Length; int i = 0; while (i < n) { int k = s.Read(buf, i, n - i); if (k == 0) throw new IOException("Socket eof"); i += k; } } private void WriteLen(Stream s, int len) { byte[] buf = new byte[MessageRouterPacket.LEN_LENGTH]; String str = len.ToString(); int padding = MessageRouterPacket.LEN_LENGTH - str.Length; for (int i = 0; i < padding; i++) str = "0" + str; System.Text.ASCIIEncoding encoding = new System.Text.ASCIIEncoding(); WriteBytes(s, encoding.GetBytes(str)); } private void WriteCryptFormat(Stream s, char cryptFormat) { byte[] buf = new byte[MessageRouterPacket.CRYPT_FORMAT_LENGTH]; buf[0] = (byte)cryptFormat; WriteBytes(s, buf); } private void WritePipe(Stream s) { byte[] pipe = new Byte[MessageRouterPacket.PIPE_LENGTH]; pipe[0] = 0x7C; WriteBytes(s, pipe); } // Write byte array into socket stream private void WriteBytes(Stream s, byte[] buf) { if (s == null) throw new IOException("Socket closed"); s.Write(buf, 0, buf.Length); } /// /// Write packet into socket stream /// /// Data stream /// Data packet private void WritePacket(Stream s, MessageRouterPacket p) { WriteLen(s, p.buf == null ? 0 : p.buf.Length); WritePipe(s); WriteCryptFormat(s, p.cryptFormat); WritePipe(s); WriteBytes(s, p.buf); s.Flush(); } #endregion #region Socket state management enum State { CLOSED, OPENING, OPENED }; /// Socket state private State _state; /// Socket state accessor private State state { get { return _state; } set { lock (this) { _state = value; Monitor.PulseAll(this); } } } #endregion #region Connection Management /// TCP client object for this IPC client private TcpClient tcpClient; /// Input stream for this TCP client private Stream inputStream; /// Output stream for this TCP client private Stream outputStream; /// Flag to indicate TCP connection is reconnected private bool reconnect; /// /// Validate socket state before opening /// private void PrepareToOpen() { if (state != State.CLOSED) throw new InvalidOperationException("Socket already open"); if (remoteEp == null) throw new InvalidOperationException("Invalid remote end point"); if (OnPacketReceived == null) throw new InvalidOperationException("Missing deletegate for packet received event"); state = State.OPENING; } /// /// Open a TCP connection to IPC server /// private void OpenConnection() { lock (this) { if (state != State.OPENING) throw new Exception("Socket state not opening - " + state); tcpClient = new TcpClient(); try { tcpClient.Connect(remoteEp); // connect to IPC server tcpClient.NoDelay = true; // disable nagle's algorithm NetworkStream networkStream = tcpClient.GetStream(); inputStream = new BufferedStream(networkStream); outputStream = new BufferedStream(networkStream); state = State.OPENED; IPEndPoint localEndPoint = tcpClient.Client.LocalEndPoint as IPEndPoint; localPort = localEndPoint.Port; if (OnConnect != null) OnConnect(this, reconnect); // notify connection is established reconnect = true; // show that we've connected at least once } catch (Exception e) { CloseConnection(e); throw e; } } } /// /// Read data from input stream /// private void ReadConnection() { Stream xis = inputStream; if (xis == null) return; while (state == State.OPENED) { MessageRouterPacket p = ReadPacket(xis); if (OnPacketReceived != null) OnPacketReceived(this, p); } } /// /// Close socket connection to IPC server /// /// Exception info private void CloseConnection(Exception e) { lock (this) { if (tcpClient != null) { if (inputStream != null) { try { inputStream.Close(); } catch { } inputStream = null; } if (outputStream != null) { try { outputStream.Close(); } catch { } outputStream = null; } try { tcpClient.Close(); } catch { } tcpClient = null; localPort = 0; if (state == State.OPENED) state = State.OPENING; // force reconnect if (OnDisconnect != null) OnDisconnect(this, e); } } } /// /// Permanently disposes of this object. /// public void Dispose() { Close(1); // close and don't wait! } #endregion #region Asynchronous Session Management /// /// Start socket connection thread /// public void Start() { lock (this) { PrepareToOpen(); thread = StartThread(new ThreadStart(Run), "Run"); } } /// /// Worker thead for connection management /// private void Run() { while (state != State.CLOSED) { OpenConnectionX(); try { ReadConnection(); CloseConnection(null); } catch (ThreadAbortException) { // nothing } catch (Exception e) { CloseConnection(e); } } } /// /// Open a TCP connection to IPC server until successful /// private void OpenConnectionX() { while (state == State.OPENING) { lock (this) { try { OpenConnection(); return; } catch (SocketException) { if (state == State.OPENING) Monitor.Wait(this, delay); } } } } /// /// Close a socket connection without wait /// public void Close() { Close(0); } /// /// Close a socket connection with timeout /// /// Time to wait for socket to close public void Close(int timeout) { if (timeout < 0) throw new ArgumentException("Invalid timeout value"); lock (this) { if (state != State.CLOSED) { state = State.CLOSED; CloseConnection(new Exception("Socket closed")); } } if (thread != Thread.CurrentThread) { StopThread(thread, timeout); thread = null; } } #endregion #region Synchronous Session Management /// /// Open socket connection to IPC server synchronously /// /// True if connection established public bool Open() { lock (this) { try { PrepareToOpen(); OpenConnection(); thread = StartThread(new ThreadStart(SyncReader), "SyncReader"); return true; } catch (SocketException e) { state = State.CLOSED; CloseConnection(e); return false; } } } /// /// Worker thread logic to read data from input stream /// private void SyncReader() { try { ReadConnection(); FinishSyncReader(null); } catch (Exception e) { FinishSyncReader(e); } } /// /// Close socket connection to IPC server /// /// private void FinishSyncReader(Exception e) { lock (this) { if (state != State.CLOSED) { state = State.CLOSED; CloseConnection(e); thread = null; } } } #endregion #region Thread Management /// The worker thead to create private Thread thread; /// /// Start worker thread /// /// Main function for this worker thread /// Thread description /// The created thread private Thread StartThread(ThreadStart start, string what) { Thread t = new Thread(start); t.IsBackground = true; t.Name = ToString() + " " + what; t.Start(); return t; } /// /// Stop the worker thread /// /// Thread object to stop /// Time to wait private void StopThread(Thread t, int timeout) { if (t == null) return; if (!t.Join(timeout)) t.Abort(); } #endregion #region Outgoing Data public bool Write(char cryptFormat, byte[] data) { try { MessageRouterPacket p = new MessageRouterPacket(cryptFormat, data); lock (writeSync) { WritePacket(outputStream, p); } } catch (Exception e) { CloseConnection(e); return false; } return true; } private Object writeSync = new Object(); #endregion } }