Aysnc Read Challenge
Originally posted at 11/11/2010
Here is how you are supposed to read from a stream:
var buffer = new byte[bufferSize]; int read = -1; int start = 0; while(read != 0) { read = stream.Read(buffer, start, buffer.Length - start); start += read; }
The reason that we do it this way is that the stream might not have all the data available for us, and might break the read requests midway.
The question is how to do this in an asynchronous manner. Asynchronous loops are… tough, to say the least. Mostly because you have to handle the state explicitly.
Here is how you can do this using the new Async CTP in C# 5.0:
private async static Task<Tuple<byte[], int>> ReadBuffer(Stream s, int bufferSize) { var buffer = new byte[bufferSize]; int read = -1; int start = 0; while (read != 0) { read = await Task.Factory.FromAsync<byte[],int,int, int>( s.BeginRead, s.EndRead, buffer, start, buffer.Length -1, null); start += read; } return Tuple.Create(buffer, start); }
Now, what I want to see is using just the TPL API, and without C# 5.0 features, can you write the same thing?
Comments
public static Task <tuple<byte[],>
{
<tuple<byte[],>
<task<int> cont = null;
<byte[],> (
}
I've had to do something like that years ago.
I don't have that code anymore, but I think my solution looked similar to this (sorry, no TPL there):
class BufferReader
{
}
Usage of that class: create an instance, attach event handlers, then call BeginRead().
BTW, your code is wrong, you should use «buffer.Length - start» instead of «buffer.Length - 1».
The C# 5.0 async usage looks so ... ugly to me. I cannot articulate why though, it just seems rather esoteric.
Here is a version that uses Rx for comparison:
http://pastebin.com/ay2Ncnbg
For more context, you should read the following blog post (from which the code was copied):
blogs.msdn.com/.../...ystem-io-stream-reading.aspx
search for
"Sequential Asynchronous Workflows in Silverlight using Coroutines"
or
"Rob Eisenburg's MVVM presentation from MIX10" which introduces that idea.
+1 Omar
ThreadPool.QueueUserWorkItem(
() => {
});
Except you won't get the benefits of IOCP.
You might want to look at this too Ayende
http://easyasync.codeplex.com/
This was inspired over 6 years back from the good old state threads library.
Now we have async CTP, node.js, torando all dealing with non blocking io...
Also do see the links at bottom...
I might be wrong but isn't C#5 version pretty much the same as:
var buffer = new byte[bufferSize];
int read = -1;
int start = 0;
while(read != 0)
{
var ar = stream.BeginRead(buffer, start, buffer.Length - start, null, null);
read = stream.EndRead(ar)
start += read;
}
Code will block on await, here it will block on EndRead, no?
Using Jeff Richter's AsyncEnumerator class
void Main()
{
}
private static IEnumerator <int32 ProcessFile(AsyncEnumerator ae, string pathName, Int32 bufferSize)
{
}
@Mihailo,
The C#5 code will not block on await. If a result is immediately available, it will continue, but if it is not, it signs the rest of the method up in a continuation and returns to the caller. The continuation will be called when the result becomes available. See Eric Lippert's excellent series of explanations.
blogs.msdn.com/.../c_2300_+5-0/
@Harry Steinhilber
Thanks for that, second installment should have explained it but it doesn't work for me. It explains how it's made but it becomes too technical for my level of knowledge of C#5 features. If you look at Jon Skeet's post:
msmvps.com/.../...-investigating-control-flow.aspx
He explains it like to an 8 year old. And yes, you're correct, it's a bit different, but to the caller of the ReadBuffer method. The part of the code I laid out is a bit of oversimplification and to work the same way it would with async and awit it needs collaboration from method caller.
I believe this code will have the same effect as C#5 version:
private static Tuple <byte[],> ReadBuffer(Stream s, int bufferSize)
var buffer = new byte[bufferSize];
int read = -1;
int start = 0;
while(read != 0)
{
var ar = stream.BeginRead(buffer, start, buffer.Length - start, null, null);
read = stream.EndRead(ar)
start += read;
}
return Tuple.Create(buffer, start);
}
Func <stream,>
var ar = readBuffer.BeginInvoke(null, null);
... do something ...
var data = readBuffer.EndInvoke(ar);
I understand this seems to be uglier, especially for the caller, but that's probably reason why the new language features are introduced. Also, I understand this code is not 1-to-1 equivalent, I'm just saying it will have the same effect for the example given by Oren.
@Mihailo
I think you're missing the point. The C# 5.0 async features executes in the same thread, calling BeginInvoke/EndInvoke execute's the logic asynchronously on a ThreadPool thread.
This is a very important difference and is the main purpose of async/await language features which allows you to easily develop composable async / non-blocking logic on the same thread.
As soon as you introduce threading you introduce concurrency issues as well as additional overhead of context switching between threads.
@Demis Bellot
I'm not arguing that async/await isn't different, or better, that's why I said it will have nearly the same effect - in this particular example - not in general.
Mind though that you may still have code executed on a different thread, you may still have ThreadPool thread in background (or something similar). How do you think continuation is executed? How do you think callbacks are executed? Language additions are not making multi-threading magically disappear, they are just making it easier for us to develop and manage it.
@Mihailo
This is incorrect and is the main point of C# 5.0 async/await. No secret background thread spawned to achieve the behaviour. It's all done by the compiler wrapping your proceeding logic in a callback which is executed as part of a generated state machine (similar to how yield works):
"Whenever a task is "awaited", the remainder of the current method is signed up as a continuation of the task, and then control immediately returns to the caller. When the task completes, the continuation is invoked and the method starts up where it was before."
So no background threads - all your logic is executed on the same thread.
I recommend the following post for some more detailed info:
blogs.msdn.com/.../asynchrony-in-c-5-part-one.aspx
As well as Anders talk on the subject:
player.microsoftpdc.com/.../1b127a7d-300e-4385-...
@Demis
We might be talking about different things here. Default implementation of Stream.BeginRead will execute:
return delegate2.BeginInvoke(buffer, offset, count, callback, state);
That's output from Reflector, so there will be new thread involved.
I'm just guessing that what you're referring to is my line:
var ar = readBuffer.BeginInvoke(null, null);
Compiler will not create a new thread like I did there.
Hi @Mihailo
I was just saying that calling a delegate concurrently like this:
var ar = readBuffer.BeginInvoke(null, null);
Executes this on a ThreadPool thread.
Here is a version that use the Task Iterator provided in the TPL Samples:
http://code.msdn.microsoft.com/ParExtSamples
<byte[]> ReadAsync(Stream stream, int length)
<task ReadBufferIterator(Stream stream, byte[] buffer, int length)
<int readTask = stream.ReadAsync(buffer, offset, length - offset);
<int ReadAsync(this Stream stream, byte[] buffer, int offset, int length)
<int.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, offset, length, null);
Comment preview