IEnumerable & IObservable.Split()
Another IObservable<> extension method I needed was Split — just like string.Split(), but generically, for any array, and an IObservable<T[]>. I started first with a generic IEnumerable<T>.Split<T>():
/// <summary>
/// Returns one or more arrays that contains the subarrays in this
/// instance that are delimited by elements of a specified array.
/// </summary>
/// <typeparam name="T">type of the array</typeparam>
/// <param name="value">value to split</param>
/// <param name="separator">one or more separators</param>
/// <remarks>not efficient with separator lengths > 1</remarks>
/// <returns></returns>
public static IEnumerable<T[]> Split<T> (this T[] value, params T[] separator) {
int separatorLength = separator.Length;
int startingIndex = 0;
int index = -1;
int length;
do {
/* loop through each item in the separator
* array verifying it exists in the source array */
for (int i = 0; i < separatorLength; i++) {
index = Array.IndexOf (value, separator[i], startingIndex);
if (-1 == index)
break;
}
if (-1 < index) {
// delimiter matched successfully
length = index - startingIndex;
T[] output = new T[length];
Array.Copy (value, startingIndex, output, 0, length);
yield return output;
startingIndex = index + 1;
}
} while (-1 < index);
if (0 == startingIndex)
// no match, send entire value
yield return value;
else {
// no more matches, send items left
length = value.Length - startingIndex;
T[] output = new T[length];
Array.Copy (value, startingIndex, output, 0, length);
}
}
Now that I can split an array, I need an IObservable<T[]>.Split() as well:
/// <summary>
/// Splits the arrays from the source observable by the delimiter
/// </summary>
/// <typeparam name="TSource">type of the source array</typeparam>
/// <param name="source">source array</param>
/// <param name="separator">the delimiter</param>
/// <returns>An observable containing the same number of items (if the
/// delimiter is not found) or additional items by splitting the incoming
/// arrays</returns>
public static IObservable<TSource[]> Split<TSource> (
this IObservable<TSource[]> source, params TSource[] separator) {
if (source == null)
throw new ArgumentNullException ("source");
if (separator.Length < 1)
throw new ArgumentOutOfRangeException ("delimiter");
return Observable.CreateWithDisposable<TSource[]> (observer =>
source.Subscribe<TSource[]> (value =>
value.Split (separator).Run (observer.OnNext),
observer.OnError, observer.OnCompleted)
);
}
Of course these extension methods can be further optimized and handle more use cases — especially by using IList<T> instead of array.