Skip to main content
  1. Posts/

ReverseAsyncRead()

··2 mins

Rx includes the AsyncRead() method that reads a stream from the beginning returning an IObservable<byte[]>. This method does the same, but in reverse. The public method overloads accept a stream and optionally a bufferSize and startingPosition.

public static IObservable<byte[]> ReverseAsyncRead (this Stream stream) {
    return ReverseAsyncRead (stream, 2 << 15);
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream,
    int bufferSize) {

    ValidateParameters (stream, bufferSize);

    return Observable.Iterate<byte[]> (observer =>
        ReverseAsyncRead (stream, bufferSize, observer, stream.Length));
}

public static IObservable<byte[]> ReverseAsyncRead (
    this Stream stream,
    int bufferSize,
    long startingPosition) {

    ValidateParameters (stream, bufferSize);
    if (0 > startingPosition)
        throw new ArgumentOutOfRangeException ("startingPosition");

    return Observable.Iterate<byte[]> (observer =>
        ReverseAsyncRead (stream, bufferSize, observer, startingPosition));
}

Using IEnumerable<IObservable<object>>, yield, Observable.FromAsyncPattern(), and Observable.Iterate(), we’re able to read a stream in blocks (of bufferSize) from the end to the front. The public method calls Iterate() which passes an observer instance to the following private method:

static void ValidateParameters (Stream stream, int bufferSize) {
    if (null == stream)
        throw new ArgumentNullException ("stream");
    if (!stream.CanSeek)
        throw new ArgumentOutOfRangeException ("stream", "stream.CanSeek is false");
    if (1 > bufferSize)
        throw new ArgumentOutOfRangeException ("bufferSize");
}

static IEnumerable<IObservable<object>> ReverseAsyncRead (
    Stream stream,
    int bufferSize,
    IObserver<byte[]> observer,
    long startingPosition) {

    Func<byte[], int, int, IObservable<int>> asyncRead =
        Observable.FromAsyncPattern<byte[], int, int, int> (
            stream.BeginRead,
            stream.EndRead);

    byte[] buffer = new byte[bufferSize];
    long position = startingPosition;
    int read = bufferSize;

    do {
        // decrement position by the amount read
        position -= read;
        if (position < 0) {
            bufferSize = (int)position * -1;
            position = 0;
        }
        // set stream's position
        stream.Position = position;

        // call stream's Begin/End Read
        {
            ListObservable<int> asyncReadResult;
            try {
                asyncReadResult = asyncRead (buffer, 0, bufferSize).Start ();
            }
            catch (Exception exception) {
                observer.OnError (exception); yield break;
            }

            yield return asyncReadResult;
            read = asyncReadResult.Value;
        }

        // send value read to observer
        try {
            if (bufferSize == read)
                observer.OnNext (buffer);
            else
                observer.OnNext (buffer.Take (read).ToArray ());
        }
        catch (Exception exception) {
            observer.OnError (exception);
            yield break;
        }

        // continue until we can't read anymore
    } while (read > 0 && position > 0);

    observer.OnCompleted ();
}

Using yield and Stream’s async method calls, we’re able to observe all byte[] blocks in the stream reading backwards.

George Tsiokos
Author
George Tsiokos

Leave a comment

Preview

Comments are reviewed before publishing.