Message passing, performance

time to read 7 min | 1324 words

I got some replies about the async event loop post, mentioning LMAX Disruptor and performance. I decided to see for myself what the fuss was all about.

You can read about the LMAX Disruptor, but basically, it is a very fast single process messaging library.

I wondered what that meant, so I wrote my own messaging library:

public class Bus<T>
{
Queue<T> q = new Queue<T>();

public void Enqueue(T msg)
{
lock (q)
{
q.Enqueue(msg);
}
}

public bool TryDequeue(out T msg)
{
lock (q)
{
if (q.Count == 0)
{
msg = default(T);
return false;
}
msg = q.Dequeue();
return true;
}
}
}

I think that you’ll agree that this is a thing of beauty and elegant coding. I then tested this with the following code:

public static void Read(Bus<string> bus)
{
int count = 0;
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
string msg;
while (bus.TryDequeue(out msg))
{
count++;
}
}
sp.Stop();

Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", count, sp.Elapsed, (count / sp.Elapsed.TotalSeconds));
}

public static void Send(Bus<string> bus)
{
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (int i = 0; i < 1000; i++)
{
bus.Enqueue("test");
}
}
}

var bus = new Bus<string>();

ThreadPool.QueueUserWorkItem(state => Send(bus));

ThreadPool.QueueUserWorkItem(state => Read(bus));

The result of this code?

145,271,000 msgs in 00:00:10.4597977 for 13,888,510 ops/sec

Now, what happens when we use the DataFlow’s BufferBlock as the bus?

public static async Task ReadAsync(BufferBlock<string> bus)
{
int count = 0;
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
try
{
await bus.ReceiveAsync(TimeSpan.FromMilliseconds(5));
count++;
}
catch (TaskCanceledException e)
{
}
}
sp.Stop();

Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", count, sp.Elapsed, (count / sp.Elapsed.TotalSeconds));
}

public static async Task SendAsync(BufferBlock<string> bus)
{
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (int i = 0; i < 1000; i++)
{
await bus.SendAsync("test");
}
}
}

What we get is:

43,268,149 msgs in 00:00:10 for 4,326,815 ops/sec.

I then decided to check what happens with the .NET port of the LMAX Disruptor. Here is the code:

public class Holder
{
public string Val;
}

internal class CounterHandler : IEventHandler<Holder>
{
public int Count;
public void OnNext(Holder data, long sequence, bool endOfBatch)
{
Count++;
}
}

static void Main(string[] args)
{
var disruptor = new Disruptor.Dsl.Disruptor<Holder>(() => new Holder(), 1024, TaskScheduler.Default);
var counterHandler = new CounterHandler();
disruptor.HandleEventsWith(counterHandler);

var ringBuffer = disruptor.Start();


var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (var i = 0; i < 1000; i++)
{
long sequenceNo = ringBuffer.Next();

ringBuffer[sequenceNo].Val = "test";

ringBuffer.Publish(sequenceNo);
}
}
Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", counterHandler.Count, sp.Elapsed, (counterHandler.Count / sp.Elapsed.TotalSeconds));
}

And the resulting performance is:

29,791,996 msgs in 00:00:10.0003334 for 2,979,100 ops/sec

Now, I’ll be the first to agree that this is really and absolutely not even close to be a fair benchmark. It is testing wildly different things. Distruptor is using a ring buffer, and the BlockBuffer didn’t, and the original Bus implementation just used an unbounded queue.

But that is a very telling benchmark as well. Pretty much because it doesn’t matter. What I need this for is for network protocol handling. As such, even assuming that every single byte is a message, we would have to go far beyond what any reasonable pipe can be expected to be handle.