using System;
using System.Text;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace MessageRouter
{
public delegate void OnMessageRouterConnectDelegate(MessageRouterConnection client, bool reconnect);
public delegate void OnMessageRouterDisconnectDelegate(MessageRouterConnection client, Exception e);
public delegate void OnMessageRouterPacketReceivedDelegate(MessageRouterConnection client, MessageRouterPacket p);
///
/// Abstraction of Fusion Message Router Packet
///
/// Fusion Message Router Message Format:
/// |||||||||^
/// : 5 bytes fixed length field containing the length of the message from the till the ^ included.
/// : 1 byte fixed length field that describes the crypt format of the message. Fixed value '5'. This means no encryption.
///
///
public class MessageRouterPacket
{
public static int MAX_PACKET_LENGTH = 1024 * 1024;
public static int LEN_LENGTH = 5;
public static int CRYPT_FORMAT_LENGTH = 1;
public static int PIPE_LENGTH = 1;
public readonly char cryptFormat;
public readonly byte[] buf;
public MessageRouterPacket(char cryptFormat, byte[] buf)
{
this.cryptFormat = cryptFormat;
this.buf = buf;
}
}
public class MessageRouterConnection : IDisposable
{
/// Remote end point
private IPEndPoint remoteEp;
/// Port of local endpoint
private int localPort;
/// Connection delay
private readonly int delay;
/// The default delay between atttempts to connect (in ms).
private const int DEFAULT_DELAY = 5000;
#region Property accessor
public IPEndPoint RemoteEp
{
get { return remoteEp; }
set { remoteEp = value; }
}
public int LocalPort
{
get { return localPort; }
}
#endregion
///
/// Constructs the ipc client with default write queue length and default delay.
///
/// The remote endpoint to use. May be specified as
/// null and set later using assignment to RemoteEp.
public MessageRouterConnection(IPEndPoint remoteEp)
: this(remoteEp, DEFAULT_DELAY)
{
}
///
/// Construct IPC client
///
/// The remote endpoint to use.
/// Delay in milliseconds between attempts to connect.
/// specify 0 to not delay any more than necessary between attempts.
///
public MessageRouterConnection(IPEndPoint remoteEp, int delay)
{
if (delay < 0)
throw new ArgumentException("Invalid delay");
this.remoteEp = remoteEp;
this.delay = delay;
this.localPort = 0;
}
///
/// Helper to convert object state to readable string
///
/// String with object properties
public override String ToString()
{
return "MessageRouterConnection(" + remoteEp + ", " + state + ")";
}
#region Socket event delegates
/// Delegate to inform when a connection is established.
public OnMessageRouterConnectDelegate OnConnect;
/// Delegate to inform when a connection is closed.
public OnMessageRouterDisconnectDelegate OnDisconnect;
/// Delegate to inform when a packet is received.
public OnMessageRouterPacketReceivedDelegate OnPacketReceived;
#endregion
#region Packet I/O
///
/// Read packet from memory stream
///
/// Memory stream
/// Packet from memory stream
public MessageRouterPacket ReadPacket(Stream s)
{
// Read message length, 5 bytes
int len = ReadLen(s);
if (len == 0)
throw new IOException("Message router packet len = 0");
if (len > MessageRouterPacket.MAX_PACKET_LENGTH)
throw new IOException("Message router packet len > Max packet size");
// Read Pipe char
if (!ReadPipe(s))
throw new IOException("Malformed router message");
// Read crypto format
char cryptFormat = ReadCryptFormat(s);
// Read Pipe char
if (!ReadPipe(s))
throw new IOException("Malformed router message");
byte[] buf = new byte[len];
ReadBytes(s, buf);
return new MessageRouterPacket(cryptFormat, buf);
}
private int ReadLen(Stream s)
{
byte[] buf = new byte[MessageRouterPacket.LEN_LENGTH];
ReadBytes(s, buf);
Encoding encoding = new ASCIIEncoding();
//Console.WriteLine("READ LEN: " + encoding.GetString(buf));
String str = encoding.GetString(buf).TrimStart('0');
return Convert.ToInt32(str);
}
private char ReadCryptFormat(Stream s)
{
byte[] buf = new byte[MessageRouterPacket.CRYPT_FORMAT_LENGTH];
ReadBytes(s, buf);
char cf = (char)buf[0];
//Console.WriteLine("READ CRYPT: " + cf);
return cf;
}
private bool ReadPipe(Stream s)
{
byte[] buf = new byte[MessageRouterPacket.PIPE_LENGTH];
ReadBytes(s, buf);
if (buf[0] != 0x7C)
return false;
return true;
}
///
/// Read byte array from socket stream
///
/// Socket data stream
/// Data buffer
private void ReadBytes(Stream s, byte[] buf)
{
if (s == null)
throw new IOException("Socket closed");
int n = buf.Length;
int i = 0;
while (i < n)
{
int k = s.Read(buf, i, n - i);
if (k == 0)
throw new IOException("Socket eof");
i += k;
}
}
private void WriteLen(Stream s, int len)
{
byte[] buf = new byte[MessageRouterPacket.LEN_LENGTH];
String str = len.ToString();
int padding = MessageRouterPacket.LEN_LENGTH - str.Length;
for (int i = 0; i < padding; i++)
str = "0" + str;
System.Text.ASCIIEncoding encoding = new System.Text.ASCIIEncoding();
WriteBytes(s, encoding.GetBytes(str));
}
private void WriteCryptFormat(Stream s, char cryptFormat)
{
byte[] buf = new byte[MessageRouterPacket.CRYPT_FORMAT_LENGTH];
buf[0] = (byte)cryptFormat;
WriteBytes(s, buf);
}
private void WritePipe(Stream s)
{
byte[] pipe = new Byte[MessageRouterPacket.PIPE_LENGTH];
pipe[0] = 0x7C;
WriteBytes(s, pipe);
}
// Write byte array into socket stream
private void WriteBytes(Stream s, byte[] buf)
{
if (s == null)
throw new IOException("Socket closed");
s.Write(buf, 0, buf.Length);
}
///
/// Write packet into socket stream
///
/// Data stream
/// Data packet
private void WritePacket(Stream s, MessageRouterPacket p)
{
WriteLen(s, p.buf == null ? 0 : p.buf.Length);
WritePipe(s);
WriteCryptFormat(s, p.cryptFormat);
WritePipe(s);
WriteBytes(s, p.buf);
s.Flush();
}
#endregion
#region Socket state management
enum State { CLOSED, OPENING, OPENED };
/// Socket state
private State _state;
/// Socket state accessor
private State state
{
get
{
return _state;
}
set
{
lock (this)
{
_state = value;
Monitor.PulseAll(this);
}
}
}
#endregion
#region Connection Management
/// TCP client object for this IPC client
private TcpClient tcpClient;
/// Input stream for this TCP client
private Stream inputStream;
/// Output stream for this TCP client
private Stream outputStream;
/// Flag to indicate TCP connection is reconnected
private bool reconnect;
///
/// Validate socket state before opening
///
private void PrepareToOpen()
{
if (state != State.CLOSED)
throw new InvalidOperationException("Socket already open");
if (remoteEp == null)
throw new InvalidOperationException("Invalid remote end point");
if (OnPacketReceived == null)
throw new InvalidOperationException("Missing deletegate for packet received event");
state = State.OPENING;
}
///
/// Open a TCP connection to IPC server
///
private void OpenConnection()
{
lock (this)
{
if (state != State.OPENING)
throw new Exception("Socket state not opening - " + state);
tcpClient = new TcpClient();
try
{
tcpClient.Connect(remoteEp); // connect to IPC server
tcpClient.NoDelay = true; // disable nagle's algorithm
NetworkStream networkStream = tcpClient.GetStream();
inputStream = new BufferedStream(networkStream);
outputStream = new BufferedStream(networkStream);
state = State.OPENED;
IPEndPoint localEndPoint = tcpClient.Client.LocalEndPoint as IPEndPoint;
localPort = localEndPoint.Port;
if (OnConnect != null)
OnConnect(this, reconnect); // notify connection is established
reconnect = true; // show that we've connected at least once
}
catch (Exception e)
{
CloseConnection(e);
throw e;
}
}
}
///
/// Read data from input stream
///
private void ReadConnection()
{
Stream xis = inputStream;
if (xis == null)
return;
while (state == State.OPENED)
{
MessageRouterPacket p = ReadPacket(xis);
if (OnPacketReceived != null)
OnPacketReceived(this, p);
}
}
///
/// Close socket connection to IPC server
///
/// Exception info
private void CloseConnection(Exception e)
{
lock (this)
{
if (tcpClient != null)
{
if (inputStream != null)
{
try
{
inputStream.Close();
}
catch { }
inputStream = null;
}
if (outputStream != null)
{
try
{
outputStream.Close();
}
catch { }
outputStream = null;
}
try
{
tcpClient.Close();
}
catch { }
tcpClient = null;
localPort = 0;
if (state == State.OPENED)
state = State.OPENING; // force reconnect
if (OnDisconnect != null)
OnDisconnect(this, e);
}
}
}
///
/// Permanently disposes of this object.
///
public void Dispose()
{
Close(1); // close and don't wait!
}
#endregion
#region Asynchronous Session Management
///
/// Start socket connection thread
///
public void Start()
{
lock (this)
{
PrepareToOpen();
thread = StartThread(new ThreadStart(Run), "Run");
}
}
///
/// Worker thead for connection management
///
private void Run()
{
while (state != State.CLOSED)
{
OpenConnectionX();
try
{
ReadConnection();
CloseConnection(null);
}
catch (ThreadAbortException)
{ // nothing
}
catch (Exception e)
{
CloseConnection(e);
}
}
}
///
/// Open a TCP connection to IPC server until successful
///
private void OpenConnectionX()
{
while (state == State.OPENING)
{
lock (this)
{
try
{
OpenConnection();
return;
}
catch (SocketException)
{
if (state == State.OPENING)
Monitor.Wait(this, delay);
}
}
}
}
///
/// Close a socket connection without wait
///
public void Close()
{
Close(0);
}
///
/// Close a socket connection with timeout
///
/// Time to wait for socket to close
public void Close(int timeout)
{
if (timeout < 0)
throw new ArgumentException("Invalid timeout value");
lock (this)
{
if (state != State.CLOSED)
{
state = State.CLOSED;
CloseConnection(new Exception("Socket closed"));
}
}
if (thread != Thread.CurrentThread)
{
StopThread(thread, timeout);
thread = null;
}
}
#endregion
#region Synchronous Session Management
///
/// Open socket connection to IPC server synchronously
///
/// True if connection established
public bool Open()
{
lock (this)
{
try
{
PrepareToOpen();
OpenConnection();
thread = StartThread(new ThreadStart(SyncReader), "SyncReader");
return true;
}
catch (SocketException e)
{
state = State.CLOSED;
CloseConnection(e);
return false;
}
}
}
///
/// Worker thread logic to read data from input stream
///
private void SyncReader()
{
try
{
ReadConnection();
FinishSyncReader(null);
}
catch (Exception e)
{
FinishSyncReader(e);
}
}
///
/// Close socket connection to IPC server
///
///
private void FinishSyncReader(Exception e)
{
lock (this)
{
if (state != State.CLOSED)
{
state = State.CLOSED;
CloseConnection(e);
thread = null;
}
}
}
#endregion
#region Thread Management
/// The worker thead to create
private Thread thread;
///
/// Start worker thread
///
/// Main function for this worker thread
/// Thread description
/// The created thread
private Thread StartThread(ThreadStart start, string what)
{
Thread t = new Thread(start);
t.IsBackground = true;
t.Name = ToString() + " " + what;
t.Start();
return t;
}
///
/// Stop the worker thread
///
/// Thread object to stop
/// Time to wait
private void StopThread(Thread t, int timeout)
{
if (t == null)
return;
if (!t.Join(timeout))
t.Abort();
}
#endregion
#region Outgoing Data
public bool Write(char cryptFormat, byte[] data)
{
try
{
MessageRouterPacket p = new MessageRouterPacket(cryptFormat, data);
lock (writeSync)
{
WritePacket(outputStream, p);
}
}
catch (Exception e)
{
CloseConnection(e);
return false;
}
return true;
}
private Object writeSync = new Object();
#endregion
}
}