Pipes and filtersThe multi threaded version
Another advantage of using the pipes and filters approach is that it is naturally thread safe. Only a single filter is handling an item at a given time, even if the pipes are all running on multiply threads.
This means that we have an very simple way to introduce threading into the system, without any hassle whatsoever. Let us take a look at the single threaded pipeline:
public class SingleThreadedPipeline<T> { private readonly List<IOperation<T>> operations = new List<IOperation<T>>(); public SingleThreadedPipeline<T> Register(IOperation<T> operation) { operations.Add(operation); return this; } public void Execute() { IEnumerable<T> current = new List<T>(); foreach (IOperation<T> operation in operations) { current = operation.Execute(current); } IEnumerator<T> enumerator = current.GetEnumerator(); while (enumerator.MoveNext()) ; } }
Very simple, except the last line, which is what push the entire pipeline along. Now, what do we need in order to make this multi threaded?
Well, what do I mean when we talk about multi threaded? I mean that we will execute all the operations concurrently, so they can process different parts of the pipeline at the same time. This allows us to make better use of our computing resources, etc.
Here is the code:
public class ThreadPoolPipeline<T> { private readonly List<IOperation<T>> operations = new List<IOperation<T>>(); public ThreadPoolPipeline<T> Register(IOperation<T> operation) { operations.Add(operation); return this; } public void Execute() { IEnumerable<T> current = new List<T>(); foreach (IOperation<T> operation in operations) { IEnumerable<T> execute = operation.Execute(current); current = StartConsuming(execute); } IEnumerator<T> enumerator = current.GetEnumerator(); while (enumerator.MoveNext()) ; }private ThreadSafeEnumerator<T> StartConsuming(IEnumerable<T> enumerable) { ThreadSafeEnumerator<T> threadSafeEnumerator = new ThreadSafeEnumerator<T>(); ThreadPool.QueueUserWorkItem(delegate { try { foreach (T t in enumerable) { threadSafeEnumerator.AddItem(t); } } catch (Exception e) { Console.WriteLine(e); } finally { threadSafeEnumerator.MarkAsFinished(); } }); return threadSafeEnumerator; }}
We are using ThreadSafeEnumerate here, and pass a callback to the thread pool which will execute the pervious part of the pipeline and push them into the current pipeline.
This is just an advance version of decorators.
The implementation of ThreadSafeEnumerator is about as simple as multi threaded code can be:
public class ThreadSafeEnumerator<T> : IEnumerable<T>, IEnumerator<T> { private bool active = true; private readonly Queue<T> cached = new Queue<T>(); private T current; public IEnumerator<T> GetEnumerator() { return this; } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } public T Current { get { return current; } } public void Dispose() { cached.Clear(); } public bool MoveNext() { lock (cached) { while (cached.Count == 0 && active) Monitor.Wait(cached); if (active == false && cached.Count == 0) return false; current = cached.Dequeue(); return true; } } public void Reset() { throw new NotSupportedException(); } object IEnumerator.Current { get { return Current; } } public void AddItem(T item) { lock (cached) { cached.Enqueue(item); Monitor.Pulse(cached); } } public void MarkAsFinished() { lock(cached) { active = false; Monitor.Pulse(cached); } } }
The real magic happens in MoveNext(), with support from AddItem() and MarkAsFinished().
This is it, these two classes are all we need to make everything else multi threaded.
Note that this version assumes that you can execute all the operations concurrently, which may not be the can if you have a lot of them (over 25/CPU by default). At that point, we would need to implement coroutines for the ThreadSafeEnumearator, instead of just blocking the thread.
More posts in "Pipes and filters" series:
- (06 Jan 2008) The multi threaded version
- (05 Jan 2008) The IEnumerable appraoch
Take a look at Google's map-reduce. It really resembles what u showed here, especially the multi threaded version, except that they use it for massive computations on grids of computers. I think that with a little remoting or WCF your pipeline implementation can do something similar.
Forgot to put this link:
For the map reduce...
In my book Practical .NET2 and C#2 I explain how to code pipeline with iterators of C#2 (keyword yield and yield break). The result is super concise syntax. For example:
And also how to compute prime numbers with pipes with not even 20 lines of code.
The whole chapter is available as an article here:
Having rediscovered LINQ as pipes and filters, I think you've now rediscovered Parallel LINQ :)
Seriously, it would be worth looking into LINQ further, and the Parallel Extensions. There's a CTP available at
Without in any way wishing to cast aspersions on your multithreading abilities, the guys behind Parallel LINQ have spent a long time on it and are seriously smart on concurrency. I'm sure they'd value your input, too.
If I never have to write multi threaded code again, I'll be very happy.
See my previous response to pipelines & linq in the previous thread.
I don't think that you can scale the syntax to be appropriate.
I would love to see myself proved wrong, however.
I've said it before but I'll say it again: you really need to go catch up on the ParallelFX libraries. The CTP is available and there are great videos on Channel9. You'll find parallel for loops, parallel foreach, parallel LINQ (over objects at the least, not sure about the others), tasks, futures, all with configurable parallelization, work-stealing, ordering, etc. It is well worth checking out, if only to help distinguish your goals and approach from theirs. It'd help us readers for that reason, too. ;)
Hi Ayende,
I like your pipe and filter implementation, its super concise and effective.
But I'm curious how you intend to implement forks and joins into your pipe and filter pattern. I'd like to see you try this out :)
See my latest post about Rhino ETL for the answer.
Comment preview