My synchronization primitive: WaitForConsumersEvent

time to read 13 min | 2580 words

I mentioned that I had an issue with the producer having to wait for multiply consumers to finish consuming before it can continue producing. The amounts I'm talking about here is in the thousands, so WaitForMultiplyObjects or WaitHandle.WaitAll() are not applicable (both are limited to waiting on 64 items only.

For the record, I don't usually write my own thread synchronization primitives, but it seems like there is no other good way to do this. This is a very simple wrapper to ManualResetEvent, which reset the internal event when it is set with more than zero consumers. When each of the consumers is called, the counter is decremented. When the counter reach zero, the event is set.

public class WaitForConsumersEvent : IDisposable

{

    int numberOfConsumers;

    ManualResetEvent doneWaitingEvent;

 

    public WaitForConsumersEvent(int numberOfConsumers)

    {

        bool initialState = SetConsumers(numberOfConsumers);

        doneWaitingEvent = new ManualResetEvent(initialState);

    }

 

    public bool WaitOne()

    {

        return doneWaitingEvent.WaitOne();

    }

 

    public bool WaitOne(TimeSpan timeout)

    {

        return doneWaitingEvent.WaitOne(timeout, false);

    }

 

    public void Set()

    {

        int val = Interlocked.Decrement(ref numberOfConsumers);

        if (val == 0)

            doneWaitingEvent.Set();

    }

 

    public void Reset(int numberOfConsumers)

    {

        if (!SetConsumers(numberOfConsumers))

            doneWaitingEvent.Reset();

    }

 

    public void Dispose()

    {

        doneWaitingEvent.Close();

    }

 

    private bool SetConsumers(int numberOfConsumers)

    {

        if (numberOfConsumers < 0)

            throw new ArgumentOutOfRangeException("numberOfConsumers", numberOfConsumers, "Should be zero or greater");

        this.numberOfConsumers = numberOfConsumers;

        return numberOfConsumers == 0;

    }

}