using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace Wayne.Lib.MessageBus
{
///
/// A threaded bus implementation. The publish method
/// will execute all subscribe actions.
///
public class ThreadedBus : IBus
{
readonly object subscriptionsLock = new object();
readonly List subscriptions = new List();
public virtual void Publish(string topic, object message)
{
SubscriptionBase[] iterationSubscriptions;
lock (subscriptionsLock)
{
iterationSubscriptions = subscriptions.ToArray();
}
iterationSubscriptions
.Where(x => Any.Match(topic, x.TopicFilter))
.Where(x => x.Type.IsInstanceOfType(message))
.ForEach(x =>
{
ThreadPool.QueueUserWorkItem(y => x.Invoke(topic, message));
});
}
public virtual IDisposable Subscribe(string topicFilter, Action action)
{
var subscription = new Subscription()
{
TopicFilter = topicFilter,
InvokeAction = action
};
lock (subscriptionsLock)
{
subscriptions.Add(subscription);
}
return new ActionDisposable(() =>
{
lock (subscriptionsLock)
{
subscriptions.Remove(subscription);
}
});
}
}
}