To demonstrate the different async options in .NET, I wrote the following samples that call the async read methods of the .NET FileStream. Each sample uses a different programming model:
- APM (Asynchronous Programming Model) — the original
BeginXxx/EndXxxcallback pattern - TPL (Task Parallel Library) — wraps APM into a non-blocking
Taskcontinuation chain - TDF (TPL Dataflow) — models the read as a producer/consumer block pipeline
- Rx (Reactive Extensions) — models the async reads as an observable sequence
Each sample reads into a buffer and delegates processing to BufferProcessor:
void BufferProcessor(byte[] buffer)
{
// process the buffer contents
}
All four complete in roughly the same amount of time. I am biased toward Rx, so it’s the most elegant code sample of them all.
APM sample Link to heading
var buffer = new byte[__bufferSize];
var flag = new ManualResetEventSlim();
Action read = null;
read = () => fileStream.BeginRead(buffer, 0, __bufferSize, ar =>
{
var bytesRead = fileStream.EndRead(ar);
if (bytesRead == 0)
{
flag.Set();
return;
}
BufferProcessor(buffer);
read();
}, null);
read();
flag.Wait();
An Action delegate calls BeginRead, passing an anonymous callback. Inside the callback, EndRead returns the number of bytes read. If it’s zero the wait handle is signaled and the callback returns; otherwise the buffer is processed and the delegate calls itself to continue the loop.
TPL sample Link to heading
var buffer = new byte[__bufferSize];
Func<Task<int>> readAsync = () =>
Task<int>.Factory.FromAsync(
fileStream.BeginRead, fileStream.EndRead,
buffer, 0, __bufferSize, null);
Func<Task> loop = null;
loop = () => readAsync().ContinueWith(task =>
{
if (task.Result == 0)
return (Task)task;
BufferProcessor(buffer);
return loop();
}).Unwrap();
Task sample = loop();
sample.Wait();
Task<int>.Factory.FromAsync wraps the APM pair into a Task<int>. A recursive Func<Task> builds a non-blocking continuation chain — each ContinueWith processes the buffer and recurses, returning the completed task at EOF. Unwrap flattens the nested Task<Task> at each level so the returned Task sample represents the entire operation. sample.Wait() blocks only once, after all reads complete.
TDF sample Link to heading
var buffer = new byte[__bufferSize];
var reader = new TransformBlock<int, byte[]>(async _ =>
{
var bytesRead = await Task<int>.Factory.FromAsync(
fileStream.BeginRead, fileStream.EndRead,
buffer, 0, __bufferSize, null);
if (bytesRead == 0)
{
reader.Complete();
return null;
}
return buffer;
});
var processor = new ActionBlock<byte[]>(data =>
{
BufferProcessor(data);
reader.Post(0);
});
reader.LinkTo(processor,
new DataflowLinkOptions { PropagateCompletion = true },
data => data != null);
reader.LinkTo(DataflowBlock.NullTarget<byte[]>());
reader.Post(0);
processor.Completion.Wait();
A TransformBlock acts as the producer — it reads from the FileStream and outputs the buffer. An ActionBlock acts as the consumer — it receives each buffer, calls BufferProcessor, and posts back to the reader to request the next read. When EOF is reached, the reader calls Complete() and outputs null, which the predicate filter routes to NullTarget. Completion propagates from the reader to the processor via PropagateCompletion. The calling thread blocks on processor.Completion.Wait().
Rx sample Link to heading
var buffer = new byte[__bufferSize];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, int>(
fileStream.BeginRead, fileStream.EndRead);
var sequence = Observable.Defer(() => readAsync(buffer, 0, __bufferSize))
.Repeat()
.TakeWhile(bytesRead => bytesRead > 0);
sequence.ForEach(_ => BufferProcessor(buffer));
The observable query is defined and assigned to sequence — nothing executes yet. Observable.Defer lazily invokes the read factory on each subscription, and Repeat re-subscribes after each completion — together they form the read loop. TakeWhile terminates the sequence when bytesRead hits zero. sequence.ForEach subscribes to kick off the stream, blocking the calling thread and calling BufferProcessor for each read.
Conclusion Link to heading
Each successive model reduces the amount of boilerplate required to express the same async operation. APM demands explicit callback wiring, a wait handle, and recursive self-invocation. TPL wraps the callbacks into a non-blocking Task continuation chain with Unwrap, but still requires recursion. TDF separates the concern into a producer/consumer pipeline with built-in completion propagation. Rx goes the furthest by modeling the entire operation as a declarative pipeline — no wait handles, no loops, no recursion, and no self-posting.
async/await, simplifying all of the above. The APM callbacks, TPL continuations, and TDF self-posting collapse into: while ((bytesRead = await readAsync()) > 0) BufferProcessor(buffer); — no Wait(), just await.