ReverseAsyncRead()
Rx includes the AsyncRead() method that reads a stream from the beginning returning an IObservable<byte[]>. This method does the same, but in reverse. The public method overloads accept a stream and optionally a bufferSize and startingPosition.
public static IObservable<byte[]> ReverseAsyncRead (this Stream stream) {
return ReverseAsyncRead (stream, 2 << 15);
}
public static IObservable<byte[]> ReverseAsyncRead (
this Stream stream,
int bufferSize) {
ValidateParameters (stream, bufferSize);
return Observable.Iterate<byte[]> (observer =>
ReverseAsyncRead (stream, bufferSize, observer, stream.Length));
}
public static IObservable<byte[]> ReverseAsyncRead (
this Stream stream,
int bufferSize,
long startingPosition) {
ValidateParameters (stream, bufferSize);
if (0 > startingPosition)
throw new ArgumentOutOfRangeException ("startingPosition");
return Observable.Iterate<byte[]> (observer =>
ReverseAsyncRead (stream, bufferSize, observer, startingPosition));
}
Using IEnumerable<IObservable<object>>, yield, Observable.FromAsyncPattern(), and Observable.Iterate(), we’re able to read a stream in blocks (of bufferSize) from the end to the front. The public method calls Iterate() which passes an observer instance to the following private method:
static void ValidateParameters (Stream stream, int bufferSize) {
if (null == stream)
throw new ArgumentNullException ("stream");
if (!stream.CanSeek)
throw new ArgumentOutOfRangeException ("stream", "stream.CanSeek is false");
if (1 > bufferSize)
throw new ArgumentOutOfRangeException ("bufferSize");
}
static IEnumerable<IObservable<object>> ReverseAsyncRead (
Stream stream,
int bufferSize,
IObserver<byte[]> observer,
long startingPosition) {
Func<byte[], int, int, IObservable<int>> asyncRead =
Observable.FromAsyncPattern<byte[], int, int, int> (
stream.BeginRead,
stream.EndRead);
byte[] buffer = new byte[bufferSize];
long position = startingPosition;
int read = bufferSize;
do {
// decrement position by the amount read
position -= read;
if (position < 0) {
bufferSize = (int)position * -1;
position = 0;
}
// set stream's position
stream.Position = position;
// call stream's Begin/End Read
{
ListObservable<int> asyncReadResult;
try {
asyncReadResult = asyncRead (buffer, 0, bufferSize).Start ();
}
catch (Exception exception) {
observer.OnError (exception); yield break;
}
yield return asyncReadResult;
read = asyncReadResult.Value;
}
// send value read to observer
try {
if (bufferSize == read)
observer.OnNext (buffer);
else
observer.OnNext (buffer.Take (read).ToArray ());
}
catch (Exception exception) {
observer.OnError (exception);
yield break;
}
// continue until we can't read anymore
} while (read > 0 && position > 0);
observer.OnCompleted ();
}
Using yield and Stream’s async method calls, we’re able to observe all byte[] blocks in the stream reading backwards.