ThreadedBus.cs 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. namespace Wayne.Lib.MessageBus
  6. {
  7. /// <summary>
  8. /// A threaded bus implementation. The publish method
  9. /// will execute all subscribe actions.
  10. /// </summary>
  11. public class ThreadedBus : IBus
  12. {
  13. readonly object subscriptionsLock = new object();
  14. readonly List<SubscriptionBase> subscriptions = new List<SubscriptionBase>();
  15. public virtual void Publish(string topic, object message)
  16. {
  17. SubscriptionBase[] iterationSubscriptions;
  18. lock (subscriptionsLock)
  19. {
  20. iterationSubscriptions = subscriptions.ToArray();
  21. }
  22. iterationSubscriptions
  23. .Where(x => Any.Match(topic, x.TopicFilter))
  24. .Where(x => x.Type.IsInstanceOfType(message))
  25. .ForEach(x =>
  26. {
  27. ThreadPool.QueueUserWorkItem(y => x.Invoke(topic, message));
  28. });
  29. }
  30. public virtual IDisposable Subscribe<T>(string topicFilter, Action<string, T> action)
  31. {
  32. var subscription = new Subscription<T>()
  33. {
  34. TopicFilter = topicFilter,
  35. InvokeAction = action
  36. };
  37. lock (subscriptionsLock)
  38. {
  39. subscriptions.Add(subscription);
  40. }
  41. return new ActionDisposable(() =>
  42. {
  43. lock (subscriptionsLock)
  44. {
  45. subscriptions.Remove(subscription);
  46. }
  47. });
  48. }
  49. }
  50. }