Pipes and filtersThe multi threaded version

time to read 8 min | 1420 words

Another advantage of using the pipes and filters approach is that it is naturally thread safe. Only a single filter is handling an item at a given time, even if the pipes are all running on multiply threads.

This means that we have an very simple way to introduce threading into the system, without any hassle whatsoever. Let us take a look at the single threaded pipeline:

public class SingleThreadedPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public SingleThreadedPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            current = operation.Execute(current);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
}

Very simple, except the last line, which is what push the entire pipeline along. Now, what do we need in order to make this multi threaded?

Well, what do I mean when we talk about multi threaded? I mean that we will execute all the operations concurrently, so they can process different parts of the pipeline at the same time. This allows us to make better use of our computing resources, etc.

Here is the code:

public class ThreadPoolPipeline<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public ThreadPoolPipeline<T> Register(IOperation<T> operation)
    {
        operations.Add(operation);
        return this;
    }

    public void Execute()
    {
        IEnumerable<T> current = new List<T>();
        foreach (IOperation<T> operation in operations)
        {
            IEnumerable<T> execute = operation.Execute(current);
            current = StartConsuming(execute);
        }
        IEnumerator<T> enumerator = current.GetEnumerator();
        while (enumerator.MoveNext()) ;
    }
    private ThreadSafeEnumerator<T> StartConsuming(IEnumerable<T> enumerable)
    {
        ThreadSafeEnumerator<T> threadSafeEnumerator = new ThreadSafeEnumerator<T>();
        ThreadPool.QueueUserWorkItem(delegate
        {
            try
            {
                foreach (T t in enumerable)
                {
                    threadSafeEnumerator.AddItem(t);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
            finally
            {
                threadSafeEnumerator.MarkAsFinished();
            }
        });
        return threadSafeEnumerator;
    }
}

We are using ThreadSafeEnumerate here, and pass a callback to the thread pool which will execute the pervious part of the pipeline and push them into the current pipeline.

This is just an advance version of decorators.

The implementation of ThreadSafeEnumerator is about as simple as multi threaded code can be:

public class ThreadSafeEnumerator<T> : IEnumerable<T>, IEnumerator<T>
{
    private bool active = true;
    private readonly Queue<T> cached = new Queue<T>();
    private T current;

    public IEnumerator<T> GetEnumerator()
    {
        return this;
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }

    public T Current
    {
        get { return current; }
    }

    public void Dispose()
    {
        cached.Clear();
    }

    public bool MoveNext()
    {
        lock (cached)
        {
            while (cached.Count == 0 && active)
                Monitor.Wait(cached);

            if (active == false && cached.Count == 0)
                return false;

            current = cached.Dequeue();

            return true;
        }
    }

    public void Reset()
    {
        throw new NotSupportedException();
    }

    object IEnumerator.Current
    {
        get { return Current; }
    }

    public void AddItem(T item)
    {
        lock (cached)
        {
            cached.Enqueue(item);
            Monitor.Pulse(cached);
        }
    }

    public void MarkAsFinished()
    {
        lock(cached)
        {
            active = false;
            Monitor.Pulse(cached);
        }
        
    }
}

The real magic happens in MoveNext(), with support from AddItem() and MarkAsFinished().

This is it, these two classes are all we need to make everything else multi threaded.

Note that this version assumes that you can execute all the operations concurrently, which may not be the can if you have a lot of them (over 25/CPU by default). At that point, we would need to implement coroutines for the ThreadSafeEnumearator, instead of just blocking the thread.

More posts in "Pipes and filters" series:

  1. (06 Jan 2008) The multi threaded version
  2. (05 Jan 2008) The IEnumerable appraoch