Async event loops in C#
I’m designing a new component, and I want to reduce the amount of complexity involved in dealing with it. This is a networked component, and after designing several such, I wanted to remove one area of complexity, which is the use of explicitly concurrent code. Because of that, I decided to go with the following architecture:
The network code is just reading messages from the network, and putting them in an in memory queue. Then we have a single threaded event loop that simply goes over the queue and process those messages.
All of the code that is actually processing messages is single threaded, which make it oh so much easier to work with.
Now, I can do this quite easily with a BlockingCollection<T>, which is how I usually did those sort of things so far. It is simple, robust and easy to understand. It also tie down a full thread for the event loop, which can be a shame if you don’t get a lot of messages.
So I decided to experiment with async approaches. In particular, using the BufferBlock<T> from the DataFlow assemblies.
I came up with the following code:
var q = new BufferBlock<int>(new DataflowBlockOptions
{
CancellationToken = cts.Token,
});
This just create the buffer block, but the nice thing here is that I can setup a “global” cancellation token for all operations on this. The problem is that this actually generate bad exceptions (InvalidOperationException, instead of TaskCancelledException). Well, I’m not sure if bad is the right term, but it isn’t the one I would expect here, at least. If you pass a cancellation token directly to the method, you get the behavior I expected.
At any rate, the code for the event loop now looks like this:
private static async Task EventLoop(BufferBlock<object> bufferBlock, CancellationToken cancellationToken)
{
while (true)
{
object msg;
try
{
msg = await bufferBlock.ReceiveAsync(TimeSpan.FromSeconds(3), cancellationToken);
}
catch (TimeoutException)
{
NoMessagesInTimeout();
continue;
}
catch (Exception e)
{
break;
}
ProcessMessage(msg);
}
}
And that is pretty much it. We have a good way to handle timeouts, and processing messages, and we don’t take up a thread. We can also be easily cancelled. I still need to run this through a lot more testing, in particular, to verify that this doesn’t cause issues when we need to debug this sort of system, but it looks promising.
Comments
This looks like a perfect match for Disruptor with MulitProducerSequencer. If you want to get performance, it's best you can do.
And the second case. That's the way EventStore works with its handlers/actors. There's no explicit concurrency. Just a in-memory message bus and queued handlers for async message processing. Simple & powerful.
Out of curiousity: why give a cancellation token on the buffer block constructor if you provide it again to the ReceiveAsync method?
Why are you using the timeout option ? and what this timeout stands for. I'm currently implementing a similar solution but we don't use the timeout since we couldn't find more precise info about what this timeout is for.
Could you use ReactiveExtensions for this? Subscribe to the messages from the network and process them as you need when they come through. It seems like it would fit really well with what you're trying to do and is inherently non-blocking
Did you consider the LMAX disruptor (there is a .NET port). This is a good use case for it.
Scooletz, a) There doesn't appear to be any C# version. b) I have many potential producers (network events), but a single consumer. No need to go complex here. It is very unlikely to actually be required.
Scooletz, Yep, that is a very powerful model. The problem begins when you actually do want concurrency. That is the point when you start talking about multiple event queues, etc.
Itzhak, Because providing it in the ctor gives different error than providing it to the receive. I like the receive error better (TaskCanceledException rather than InvalidOperationException).
Gabriel, Because for my scenario, I need to also do something when there is a timeout. For example, accept N messages or up until 1 second, then do something.
Craig, That seems like a LOT of work to do to get something that I can do in a very few lines of code.
So you are listen for up until 3 second and if no message arrive in 3 seconds you need to do something? Just to know, if a message arrive in this time the Timeout is reset?
Gabriel, Yes, that is the basic idea. The actual thing would probably be a bit more complex, and involve two event loops, but I wanted to see that I can do that. Basically, if we don't get a message within a given time frame, that means that we failed, and we need to retry our operation. Think about heartbeat scenarios.
Rx would be a perfect fit for asynchronously handling streams of data. There's a clear separation between the concurrency model (Schedulers) and data handling. In a previous job we implemented our network stack entirely on Rx and it was a huge win. Oren, I've been following your excellent blog for many years and admire your technical skills. Yet for some reason you seem to be missing out completely on the Rx framework. I know it took me awhile to "grok" it, but it totally paid of. If you enjoy LINQ, you'll love Rx. Nowadays I work at Google and I no longer write in C#. The one thing I miss the most about C# & .NET is Rx.
Omer, I've used RX extensively in several places in RavenDB. In particular, our changes API is meant to deal with that directly. That said, I don't want schedulers. I just want to process events. And I need to handle that in a way that is simple to debug. A pull model is drastically simpler to work with than a push model.
I was intimidated by the LMAX Disruptor as well and thought it would be a lot of work, but it's really only 10 lines of code and a class per event handler to get it up and going. I was pleasantly surprised. I love TPL dataflow too - but I prefer it for modeling dataflow, not high performance single threaded event processing. The disruptor pattern handles that quite well, and includes a good pipeline abstraction that allows for event listeners to work in parallel as well, so it it is quite flexible.
I understand the rationale of simplicity using the pull based model over a push model. However, if timeout scheduling is only related to specific events (e.g. arrival of transport messages), whereas other operations should also be processed as event messages in the same event loop, timeout management becomes a bit messy. Perhaps explicitly scheduling timeouts (and other delayed actions if appropriate) as event messages would make this cleaner than using a timeout on the "Receive".
In terms of a push based model, I agree with Scooletz that the queued handlers / in-memory bus model used in e.g. EventStore is pretty nice as well, as it allows for several (monitored) composition / decoration scenarios with a fairly simple paradigm and limited amount of infrastructure.
Craig, Take a look here: http://ayende.com/blog/167361/message-passing-performance?key=8428297b7f964928bf6416239eff1a1b
Good points - all depends on your context, namely your expected input/output and inner work characteristics. I would probably reach to TPL dataflow because of the rich semantics provide simpler modeling, as well as a good infrastructure of the underlying TPL. I would probably use the Disruptor if I had a fairly limited domain and my message handlers were fast and I wanted to maximize for throughput. A simple queue may work for an entry point, but the minute I wanted to implement even the simplest dataflow scenario, a simple framework such as TPL dataflow or the disruptor would make have a big return on maintainability, and in some cases, performance.
Comment preview