Mastering ReactiveUI.Extensions: A Comprehensive Guide to Async Reactive Programming in .NET¶
Author: Chris Pulman Published: March 16, 2026
ReactiveUI.Extensions represents a significant evolution in reactive programming for .NET, providing a fully async-native observable framework that bridges the gap between traditional System.Reactive (Rx.NET) and modern async/await patterns. This library introduces IObservableAsync<T>, IObserverAsync<T>, and a comprehensive suite of operators that work seamlessly with ValueTask, CancellationToken, and IAsyncDisposable throughout the entire reactive pipeline.
This article provides an extensive, in-depth technical exploration of ReactiveUI.Extensions, covering every public function, advanced multi-threading scenarios, and practical patterns for building robust, scalable reactive applications.
Table of Contents¶
- 1. Introduction to ReactiveUI.Extensions
- 2. Understanding IObservableAsync
vs IObservable - 3. Understanding IObserverAsync
vs IObserver - 4. Core Async Interfaces and Types
- 5. Factory Methods and Observable Creation
- 6. Transformation Operators
- 7. Filtering Operators
- 8. Combining and Merging Operators
- 9. Error Handling and Resilience
- 10. Timing and Scheduling
- 11. Aggregation and Terminal Operators
- 12. Async Disposables and Resource Management
- 13. Subjects and Multicasting
- 14. Bridging Classic and Async Observables
- 15. Classic Reactive Extensions Operators
- 16. Advanced Multi-Threading Examples
- 17. Performance Considerations
- 18. Best Practices
1. Introduction to ReactiveUI.Extensions¶
Why ReactiveUI.Extensions?¶
Traditional System.Reactive (Rx.NET) was designed before the widespread adoption of async/await in C#. While Rx.NET provides excellent support for asynchronous data streams, it has several limitations in modern .NET development:
- 1. Synchronous Observer Callbacks:
IObserver<T>methods (OnNext,OnError,OnCompleted) are synchronous, which can lead to thread pool starvation when performing async operations - 2. No Built-in Cancellation: Traditional observables don't integrate naturally with
CancellationToken - 3. Synchronous Disposal:
IDisposabledoesn't support async cleanup operations - 4. Task Integration Friction: Converting between
Task,IAsyncEnumerable, andIObservablerequires boilerplate
ReactiveUI.Extensions solves these problems by providing:
- End-to-End Async: Every notification (
OnNextAsync,OnErrorResumeAsync,OnCompletedAsync) returnsValueTask - Cancellation-First Design: Every operator accepts
CancellationToken - Async Disposal: Subscriptions return
IAsyncDisposablefor proper async resource cleanup - Seamless Interop: Bidirectional bridging between
IObservableandIObservableAsync - Modern .NET Support: Requires .NET 8 or later, leveraging modern language features
Installation¶
dotnet add package ReactiveUI.Extensions
Supported Target Frameworks: .NET 4.6.2, .NET 4.7.2, .NET 4.8.1, .NET 8, .NET 9, .NET 10
2. Understanding IObservableAsync vs IObservable¶
IObservable (Traditional Rx.NET)¶
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
Characteristics:
- Synchronous subscription
- Returns
IDisposablefor cleanup - Observer callbacks are synchronous
- No built-in cancellation support
- Can block threads during async operations
IObservableAsync (ReactiveUI.Extensions)¶
public interface IObservableAsync<T>
{
ValueTask<IAsyncDisposable> SubscribeAsync(
IObserverAsync<T> observer,
CancellationToken cancellationToken);
}
Characteristics:
- Asynchronous subscription (
ValueTask) - Returns
IAsyncDisposablefor async cleanup - Observer callbacks are asynchronous (
ValueTask) - Built-in
CancellationTokensupport - Non-blocking throughout the pipeline
Key Differences Table¶
| Feature | IObservable | IObservableAsync |
|---|---|---|
| Subscription | Synchronous | Asynchronous (ValueTask) |
| Disposal | IDisposable | IAsyncDisposable |
| OnNext | void | ValueTask |
| OnError | void | ValueTask |
| OnCompleted | void | ValueTask |
| Cancellation | Manual | Built-in CancellationToken |
| Thread Blocking | Possible | Never |
| .NET Version | Any | .NET 8+ |
When to Use Each¶
Use IObservableAsync
- Building new async-first applications
- Working with I/O-bound operations (network, database, file system)
- Needing proper cancellation support
- Requiring async resource cleanup
- Targeting .NET 8 or later
Use IObservable
- Maintaining legacy code
- Working with CPU-bound operations
- Interoperating with existing Rx.NET libraries
- Targeting older .NET versions
3. Understanding IObserverAsync vs IObserver¶
IObserver (Traditional)¶
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
Problems:
- 1. No Async Support: Cannot await async operations in callbacks
- 2. Exception Handling: Exceptions in callbacks can crash the application
- 3. Resource Cleanup: No async disposal mechanism
- 4. Backpressure: Difficult to implement async backpressure
IObserverAsync (ReactiveUI.Extensions)¶
public interface IObserverAsync<in T> : IAsyncDisposable
{
ValueTask OnNextAsync(T value, CancellationToken cancellationToken);
ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken);
ValueTask OnCompletedAsync(Result result);
}
Advantages:
- 1. Full Async Support: All callbacks are
ValueTask-based - 2. Error Recovery:
OnErrorResumeAsyncallows error recovery and continuation - 3. Async Disposal: Implements
IAsyncDisposablefor proper cleanup - 4. Cancellation: Every callback receives
CancellationToken - 5. Result Tracking:
OnCompletedAsyncreceivesResultindicating success/failure
Practical Example: Traditional vs Async Observer¶
// Traditional IObserver - Problem: Blocking async operations
public class TraditionalObserver : IObserver<string>
{
public void OnNext(string value)
{
// BAD: Blocking async operation
var result = SaveToDatabaseAsync(value).Result; // Thread pool starvation!
Console.WriteLine($"Saved: {result}");
}
public void OnError(Exception error) => Console.WriteLine($"Error: {error}");
public void OnCompleted() => Console.WriteLine("Completed");
}
// Async IObserverAsync - Solution: Non-blocking throughout
public class AsyncObserver : ObserverAsync<string>
{
protected override async ValueTask OnNextAsyncCore(
string value,
CancellationToken cancellationToken)
{
// GOOD: Non-blocking async operation
var result = await SaveToDatabaseAsync(value, cancellationToken);
Console.WriteLine($"Saved: {result}");
}
protected override async ValueTask OnErrorResumeAsyncCore(
Exception error,
CancellationToken cancellationToken)
{
// Can attempt recovery
await LogErrorAsync(error, cancellationToken);
// Can choose to continue or stop
}
protected override async ValueTask OnCompletedAsyncCore(Result result)
{
await CleanupAsync();
Console.WriteLine($"Completed: {result.IsSuccess}");
}
protected override ValueTask DisposeAsyncCore() => CleanupResourcesAsync();
}
Error Recovery with OnErrorResumeAsync¶
One of the most powerful features of IObserverAsync<T> is the ability to recover from errors:
public class ResilientObserver : ObserverAsync<int>
{
private int _retryCount = 0;
private const int MaxRetries = 3;
protected override async ValueTask OnNextAsyncCore(
int value,
CancellationToken cancellationToken)
{
try
{
await ProcessValueAsync(value, cancellationToken);
}
catch (Exception ex) when (ex is TimeoutException)
{
// Error will be sent to OnErrorResumeAsyncCore
throw;
}
}
protected override async ValueTask OnErrorResumeAsyncCore(
Exception error,
CancellationToken cancellationToken)
{
if (_retryCount < MaxRetries)
{
_retryCount++;
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
// Continue processing - don't propagate error
return;
}
// Max retries exceeded - log and continue
await LogErrorAsync(error, cancellationToken);
}
protected override ValueTask OnCompletedAsyncCore(Result result)
{
Console.WriteLine($"Completed with {(result.IsSuccess ? "success" : "failure")}");
return default;
}
}
4. Core Async Interfaces and Types¶
IObservableAsync¶
public interface IObservableAsync<out T>
{
ValueTask<IAsyncDisposable> SubscribeAsync(
IObserverAsync<T> observer,
CancellationToken cancellationToken);
}
Purpose: Represents an asynchronous push-based notification provider.
Key Points:
- Subscription is asynchronous (
ValueTask) - Returns
IAsyncDisposablefor async cleanup - Supports cancellation via
CancellationToken - Thread-safe for concurrent subscriptions
IObserverAsync¶
public interface IObserverAsync<in T> : IAsyncDisposable
{
ValueTask OnNextAsync(T value, CancellationToken cancellationToken);
ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken);
ValueTask OnCompletedAsync(Result result);
}
Purpose: Defines an asynchronous observer that receives notifications.
Key Points:
- All callbacks are asynchronous
- Implements
IAsyncDisposablefor cleanup OnErrorResumeAsyncallows error recoveryResultparameter indicates completion status
ObservableAsync (Abstract Base Class)¶
public abstract class ObservableAsync<T> : IObservableAsync<T>
{
public async ValueTask<IAsyncDisposable> SubscribeAsync(
IObserverAsync<T> observer,
CancellationToken cancellationToken)
{
var subscription = await SubscribeAsyncCore(observer, cancellationToken);
return subscription;
}
protected abstract ValueTask<IAsyncDisposable> SubscribeAsyncCore(
IObserverAsync<T> observer,
CancellationToken cancellationToken);
}
Purpose: Base class for creating custom async observables.
Example: Custom Async Observable
public class TickObservable : ObservableAsync<int>
{
private readonly int _count;
private readonly TimeSpan _interval;
public TickObservable(int count, TimeSpan interval)
{
_count = count;
_interval = interval;
}
protected override async ValueTask<IAsyncDisposable> SubscribeAsyncCore(
IObserverAsync<int> observer,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_ = Task.Run(async () =>
{
try
{
for (int i = 0; i < _count && !cts.Token.IsCancellationRequested; i++)
{
await observer.OnNextAsync(i, cts.Token);
await Task.Delay(_interval, cts.Token);
}
await observer.OnCompletedAsync(Result.Success);
}
catch (OperationCanceledException)
{
await observer.OnCompletedAsync(Result.Success);
}
catch (Exception ex)
{
await observer.OnErrorResumeAsync(ex, cts.Token);
}
}, cts.Token);
return new CancellationDisposable(cts);
}
}
// Usage
var ticks = new TickObservable(10, TimeSpan.FromMilliseconds(100));
await using var subscription = await ticks.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Tick: {value}"),
CancellationToken.None);
ConnectableObservableAsync¶
public class ConnectableObservableAsync<T> : ObservableAsync<T>
{
public ValueTask<IAsyncDisposable> ConnectAsync(CancellationToken cancellationToken);
}
Purpose: Extends ObservableAsync with deferred connection for multicasting.
Use Case: When you want to control when the source starts emitting (e.g., after multiple subscribers are ready).
Result¶
public readonly struct Result
{
public bool IsSuccess { get; }
public Exception? Exception { get; }
public static Result Success { get; } = new(true, null);
public static Result Failure(Exception exception) => new(false, exception);
}
Purpose: Carries completion status (success or failure) to OnCompletedAsync.
5. Factory Methods and Observable Creation¶
ObservableAsync.Create¶
Creates an observable from an async subscription delegate.
public static IObservableAsync<T> Create<T>(
Func<IObserverAsync<T>, CancellationToken, ValueTask<IAsyncDisposable>> subscribeAsync)
Example:
var custom = ObservableAsync.Create<string>(async (observer, ct) =>
{
await observer.OnNextAsync("Hello", ct);
await observer.OnNextAsync("World", ct);
await observer.OnCompletedAsync(Result.Success);
return DisposableAsync.Empty;
});
await using var sub = await custom.SubscribeAsync(
async (value, ct) => Console.WriteLine(value),
CancellationToken.None);
ObservableAsync.CreateAsBackgroundJob¶
Runs the subscription logic as a background task.
public static IObservableAsync<T> CreateAsBackgroundJob<T>(
Func<IObserverAsync<T>, CancellationToken, ValueTask> job,
bool startSynchronously = false)
Example:
var background = ObservableAsync.CreateAsBackgroundJob<int>(async (observer, ct) =>
{
for (int i = 0; i < 5; i++)
{
await observer.OnNextAsync(i, ct);
await Task.Delay(100, ct);
}
await observer.OnCompletedAsync(Result.Success);
});
ObservableAsync.Return¶
Emits a single value and completes.
public static IObservableAsync<T> Return<T>(T value)
Example:
var single = ObservableAsync.Return(42);
ObservableAsync.Empty¶
Completes immediately without emitting.
public static IObservableAsync<T> Empty<T>()
ObservableAsync.Never¶
Never emits and never completes.
public static IObservableAsync<T> Never<T>()
ObservableAsync.Throw¶
Completes immediately with an error.
public static IObservableAsync<T> Throw<T>(Exception exception)
ObservableAsync.Range¶
Emits a sequence of integers.
public static IObservableAsync<long> Range(long start, long count)
Example:
var numbers = ObservableAsync.Range(1, 10); // 1, 2, 3, ..., 10
ObservableAsync.Interval¶
Emits incrementing values at regular intervals.
public static IObservableAsync<long> Interval(
TimeSpan period,
TimeProvider? timeProvider = null)
Example:
var timer = ObservableAsync.Interval(TimeSpan.FromSeconds(1));
ObservableAsync.Timer¶
Emits a value after a delay.
public static IObservableAsync<long> Timer(
TimeSpan dueTime,
TimeProvider? timeProvider = null)
Example:
var delayed = ObservableAsync.Timer(TimeSpan.FromSeconds(5));
ObservableAsync.Defer¶
Defers observable creation until subscription.
public static IObservableAsync<T> Defer<T>(
Func<IObservableAsync<T>> factory)
Example:
var deferred = ObservableAsync.Defer(() =>
ObservableAsync.Return(DateTime.Now.Second));
ObservableAsync.FromAsync¶
Wraps an async function as an observable.
public static IObservableAsync<T> FromAsync<T>(
Func<CancellationToken, Task<T>> factory)
Example:
var fromTask = ObservableAsync.FromAsync(async ct =>
{
await Task.Delay(100, ct);
return 42;
});
Conversion Extensions¶
// Task to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(this Task<T> task)
// IAsyncEnumerable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IAsyncEnumerable<T> asyncEnumerable)
// IEnumerable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IEnumerable<T> enumerable)
// IObservable to IObservableAsync (Bridge)
public static IObservableAsync<T> ToObservableAsync<T>(
this IObservable<T> observable)
// IObservableAsync to IObservable (Bridge)
public static IObservable<T> ToObservable<T>(
this IObservableAsync<T> asyncObservable)
Example:
// From Task
var task = Task.FromResult(42);
var obs = task.ToObservableAsync();
// From IAsyncEnumerable
async IAsyncEnumerable<int> GenerateAsync()
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(50);
yield return i;
}
}
var fromAsyncEnum = GenerateAsync().ToObservableAsync();
// Bridge from classic IObservable
var classic = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(5);
var asyncVersion = classic.ToObservableAsync();
6. Transformation Operators¶
Select (Sync)¶
Projects each element using a synchronous function.
public static IObservableAsync<TResult> Select<TSource, TResult>(
this IObservableAsync<TSource> source,
Func<TSource, TResult> selector)
Example:
var source = ObservableAsync.Range(1, 5);
var doubled = source.Select(x => x * 2); // 2, 4, 6, 8, 10
Select (Async)¶
Projects each element using an async function.
public static IObservableAsync<TResult> Select<TSource, TResult>(
this IObservableAsync<TSource> source,
Func<TSource, CancellationToken, ValueTask<TResult>> asyncSelector)
Example:
var projected = source.Select(async (x, ct) =>
{
await Task.Delay(10, ct);
return x.ToString();
});
SelectMany¶
Flattens nested async observables.
public static IObservableAsync<TResult> SelectMany<TSource, TResult>(
this IObservableAsync<TSource> source,
Func<TSource, IObservableAsync<TResult>> selector)
Example:
var flattened = source.SelectMany(x =>
ObservableAsync.Range(x * 10, 3));
// For x=1: 10, 11, 12
// For x=2: 20, 21, 22
// etc.
Scan (Sync)¶
Applies an accumulator over the sequence.
public static IObservableAsync<TAccumulate> Scan<TSource, TAccumulate>(
this IObservableAsync<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> accumulator)
Example:
var runningTotal = source.Scan(0, (acc, x) => acc + x);
// 1, 3, 6, 10, 15
Scan (Async)¶
Async accumulator.
public static IObservableAsync<TAccumulate> Scan<TSource, TAccumulate>(
this IObservableAsync<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> asyncAccumulator)
Cast¶
Casts each element to the specified type.
public static IObservableAsync<TResult> Cast<TSource, TResult>(
this IObservableAsync<TSource> source)
OfType¶
Filters elements assignable to the specified type.
public static IObservableAsync<TResult> OfType<TSource, TResult>(
this IObservableAsync<TSource> source)
GroupBy¶
Groups elements by key.
public static IObservableAsync<GroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(
this IObservableAsync<TSource> source,
Func<TSource, TKey> keySelector)
Example:
var grouped = source.GroupBy(x => x % 2 == 0 ? "even" : "odd");
7. Filtering Operators¶
Where (Sync)¶
Filters elements using a synchronous predicate.
public static IObservableAsync<T> Where<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
Example:
var evens = source.Where(x => x % 2 == 0);
Where (Async)¶
Filters using an async predicate.
public static IObservableAsync<T> Where<T>(
this IObservableAsync<T> source,
Func<T, CancellationToken, ValueTask<bool>> asyncPredicate)
Example:
var asyncFiltered = source.Where(async (x, ct) =>
{
await Task.Delay(1, ct);
return x > 5;
});
Take¶
Takes the first N elements.
public static IObservableAsync<T> Take<T>(
this IObservableAsync<T> source,
int count)
Skip¶
Skips the first N elements.
public static IObservableAsync<T> Skip<T>(
this IObservableAsync<T> source,
int count)
TakeWhile¶
Takes elements while predicate holds.
public static IObservableAsync<T> TakeWhile<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
SkipWhile¶
Skips elements while predicate holds.
public static IObservableAsync<T> SkipWhile<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
TakeUntil (Multiple Overloads)¶
Takes elements until a signal.
// Until another observable
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
IObservableAsync<Unit> other)
// Until a task
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
Task other)
// Until cancellation token
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
// Until predicate
public static IObservableAsync<T> TakeUntil<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate)
Example:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var bounded = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100))
.TakeUntil(cts.Token);
Distinct¶
Emits only unique elements.
public static IObservableAsync<T> Distinct<T>(
this IObservableAsync<T> source)
DistinctBy¶
Emits only elements with unique keys.
public static IObservableAsync<T> DistinctBy<T, TKey>(
this IObservableAsync<T> source,
Func<T, TKey> keySelector)
DistinctUntilChanged¶
Suppresses consecutive duplicates.
public static IObservableAsync<T> DistinctUntilChanged<T>(
this IObservableAsync<T> source)
DistinctUntilChangedBy¶
Suppresses consecutive elements with same key.
public static IObservableAsync<T> DistinctUntilChangedBy<T, TKey>(
this IObservableAsync<T> source,
Func<T, TKey> keySelector)
Example:
var items = new[] { 1, 2, 2, 3, 1, 3 }.ToObservableAsync();
var unique = items.Distinct(); // 1, 2, 3
var noConsecDups = items.DistinctUntilChanged(); // 1, 2, 3, 1, 3
8. Combining and Merging Operators¶
Merge¶
Merges multiple observables, interleaving values.
// Two sources
public static IObservableAsync<T> Merge<T>(
this IObservableAsync<T> first,
IObservableAsync<T> second)
// Multiple sources with concurrency limit
public static IObservableAsync<T> Merge<T>(
this IObservableAsync<IObservableAsync<T>> sources,
int maxConcurrency)
Example:
var a = ObservableAsync.Range(1, 3); // 1, 2, 3
var b = ObservableAsync.Range(10, 3); // 10, 11, 12
var merged = ObservableAsync.Merge(a, b); // Interleaved: 1, 10, 2, 11, 3, 12 (order may vary)
Concat¶
Concatenates observables sequentially.
public static IObservableAsync<T> Concat<T>(
this IObservableAsync<T> first,
IObservableAsync<T> second)
Example:
var sequential = ObservableAsync.Concat(a, b); // 1, 2, 3, 10, 11, 12
Switch¶
Switches to the most recent inner observable.
public static IObservableAsync<T> Switch<T>(
this IObservableAsync<IObservableAsync<T>> sources)
Use Case: When you want to cancel previous operations when a new one arrives (e.g., search as you type).
CombineLatest¶
Combines latest values from multiple sources.
// Two sources
public static IObservableAsync<TResult> CombineLatest<T1, T2, TResult>(
this IObservableAsync<T1> first,
IObservableAsync<T2> second,
Func<T1, T2, TResult> selector)
// Up to 8 sources
public static IObservableAsync<TResult> CombineLatest<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(
IObservableAsync<T1> first,
IObservableAsync<T2> second,
// ... up to 8 sources
Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> selector)
Example:
var combined = a.CombineLatest(b, (x, y) => $"{x}+{y}");
// Emits whenever either source emits, with latest values from both
Zip¶
Pairs elements by index.
public static IObservableAsync<TResult> Zip<T1, T2, TResult>(
this IObservableAsync<T1> first,
IObservableAsync<T2> second,
Func<T1, T2, TResult> selector)
Example:
var zipped = a.Zip(b, (x, y) => x + y); // 11, 13, 15
Prepend¶
Prepends a single value.
public static IObservableAsync<T> Prepend<T>(
this IObservableAsync<T> source,
T value)
StartWith¶
Prepends multiple values.
public static IObservableAsync<T> StartWith<T>(
this IObservableAsync<T> source,
params T[] values)
Example:
var withPrefix = a.Prepend(0); // 0, 1, 2, 3
var withMany = a.StartWith(-2, -1, 0); // -2, -1, 0, 1, 2, 3
9. Error Handling and Resilience¶
Catch¶
Catches errors and switches to fallback.
public static IObservableAsync<T> Catch<T>(
this IObservableAsync<T> source,
Func<Exception, IObservableAsync<T>> handler)
Example:
var flaky = ObservableAsync.Throw<int>(new InvalidOperationException("Oops"));
var safe = flaky.Catch(ex => ObservableAsync.Return(-1)); // emits -1
CatchAndIgnoreErrorResume¶
Suppresses error-resume notifications.
public static IObservableAsync<T> CatchAndIgnoreErrorResume<T>(
this IObservableAsync<T> source)
Retry (Infinite)¶
Re-subscribes indefinitely on error.
public static IObservableAsync<T> Retry<T>(
this IObservableAsync<T> source)
Retry (Count-Limited)¶
Re-subscribes up to N times.
public static IObservableAsync<T> Retry<T>(
this IObservableAsync<T> source,
int count)
Example:
var retried = flaky.Retry(3); // Retry up to 3 times
OnErrorResumeAsFailure¶
Converts error-resume to failure completion.
public static IObservableAsync<T> OnErrorResumeAsFailure<T>(
this IObservableAsync<T> source)
10. Timing and Scheduling¶
Throttle¶
Suppresses rapid emissions (debounce).
public static IObservableAsync<T> Throttle<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan,
TimeProvider? timeProvider = null)
Example:
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(50)).Take(10);
var throttled = source.Throttle(TimeSpan.FromMilliseconds(200));
Delay¶
Delays each emission.
public static IObservableAsync<T> Delay<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan,
TimeProvider? timeProvider = null)
Timeout¶
Raises TimeoutException if no value arrives.
// With exception
public static IObservableAsync<T> Timeout<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan)
// With fallback
public static IObservableAsync<T> Timeout<T>(
this IObservableAsync<T> source,
TimeSpan timeSpan,
IObservableAsync<T> fallback)
Example:
var guarded = source.Timeout(TimeSpan.FromSeconds(2));
var withFallback = source.Timeout(
TimeSpan.FromSeconds(2),
ObservableAsync.Return(999L));
ObserveOn¶
Shifts notifications to specified context.
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
AsyncContext context)
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
SynchronizationContext context)
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
TaskScheduler scheduler)
public static IObservableAsync<T> ObserveOn<T>(
this IObservableAsync<T> source,
IScheduler scheduler)
Example:
var onContext = source.ObserveOn(
new AsyncContext(SynchronizationContext.Current!));
Yield¶
Yields control between notifications.
public static IObservableAsync<T> Yield<T>(
this IObservableAsync<T> source)
11. Aggregation and Terminal Operators¶
AggregateAsync¶
Applies accumulator and returns final result.
public static ValueTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(
this IObservableAsync<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator,
CancellationToken cancellationToken)
Example:
var source = ObservableAsync.Range(1, 5);
int sum = await source.AggregateAsync(0, (a, x) => a + x, CancellationToken.None); // 15
CountAsync¶
Returns element count.
public static ValueTask<int> CountAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
LongCountAsync¶
Returns element count as long.
public static ValueTask<long> LongCountAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
AnyAsync¶
Returns true if any elements exist.
public static ValueTask<bool> AnyAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
public static ValueTask<bool> AnyAsync<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate,
CancellationToken cancellationToken)
AllAsync¶
Returns true if all elements match predicate.
public static ValueTask<bool> AllAsync<T>(
this IObservableAsync<T> source,
Func<T, bool> predicate,
CancellationToken cancellationToken)
ContainsAsync¶
Returns true if sequence contains value.
public static ValueTask<bool> ContainsAsync<T>(
this IObservableAsync<T> source,
T value,
CancellationToken cancellationToken)
FirstAsync¶
Returns first element or throws.
public static ValueTask<T> FirstAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
FirstOrDefaultAsync¶
Returns first element or default.
public static ValueTask<T> FirstOrDefaultAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
LastAsync / LastOrDefaultAsync¶
Returns last element.
public static ValueTask<T> LastAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
public static ValueTask<T> LastOrDefaultAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
SingleAsync / SingleOrDefaultAsync¶
Returns single element or throws.
public static ValueTask<T> SingleAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
public static ValueTask<T> SingleOrDefaultAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
ToListAsync¶
Collects all elements into List.
public static ValueTask<List<T>> ToListAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
Example:
List<int> all = await source.ToListAsync(CancellationToken.None); // [1, 2, 3, 4, 5]
ToDictionaryAsync¶
Collects into Dictionary.
public static ValueTask<Dictionary<TKey, T>> ToDictionaryAsync<TSource, TKey>(
this IObservableAsync<TSource> source,
Func<TSource, TKey> keySelector,
CancellationToken cancellationToken)
ForEachAsync¶
Invokes async action for each element.
public static ValueTask ForEachAsync<T>(
this IObservableAsync<T> source,
Func<T, CancellationToken, ValueTask> action,
CancellationToken cancellationToken)
Example:
await source.ForEachAsync(async (x, ct) =>
Console.WriteLine($"Item: {x}"), CancellationToken.None);
WaitCompletionAsync¶
Awaits completion without capturing values.
public static ValueTask WaitCompletionAsync<T>(
this IObservableAsync<T> source,
CancellationToken cancellationToken)
12. Async Disposables and Resource Management¶
DisposableAsync¶
Provides async disposable utilities.
public static class DisposableAsync
{
public static IAsyncDisposable Empty { get; }
public static IAsyncDisposable Create(Func<ValueTask> disposeAsync);
}
CompositeDisposableAsync¶
Manages multiple async disposables.
public sealed class CompositeDisposableAsync : IAsyncDisposable
{
public CompositeDisposableAsync();
public CompositeDisposableAsync(int capacity);
public CompositeDisposableAsync(params IAsyncDisposable[] disposables);
public CompositeDisposableAsync(IEnumerable<IAsyncDisposable> disposables);
public bool IsDisposed { get; }
public int Count { get; }
public ValueTask AddAsync(IAsyncDisposable item);
public ValueTask<bool> RemoveAsync(IAsyncDisposable item);
public ValueTask ClearAsync();
public bool Contains(IAsyncDisposable item);
public ValueTask DisposeAsync();
}
Example:
var composite = new CompositeDisposableAsync();
await composite.AddAsync(someResource1);
await composite.AddAsync(someResource2);
// All resources disposed together
await composite.DisposeAsync();
SerialDisposableAsync¶
Manages a single async disposable that can be replaced.
public class SerialDisposableAsync : IAsyncDisposable
{
public ValueTask SetDisposableAsync(IAsyncDisposable? value);
public ValueTask DisposeAsync();
}
Use Case: When you need to replace a resource and automatically dispose the old one.
Example:
var serial = new SerialDisposableAsync();
// Set initial resource
await serial.SetDisposableAsync(resource1);
// Replace - resource1 is automatically disposed
await serial.SetDisposableAsync(resource2);
// Dispose - resource2 is disposed
await serial.DisposeAsync();
SingleAssignmentDisposableAsync¶
Allows single assignment of async disposable.
public sealed class SingleAssignmentDisposableAsync : IAsyncDisposable
{
public bool IsDisposed { get; }
public IAsyncDisposable? GetDisposable();
public ValueTask SetDisposableAsync(IAsyncDisposable? value);
public ValueTask DisposeAsync();
}
Use Case: When a resource must be assigned exactly once.
13. Subjects and Multicasting¶
SubjectAsync¶
Async subject for multicasting.
public static class SubjectAsync
{
public static ISubjectAsync<T> Create<T>();
public static ISubjectAsync<T> CreateBehavior<T>(T initialValue);
public static ISubjectAsync<T> CreateReplayLatest<T>();
}
ISubjectAsync¶
public interface ISubjectAsync<T> : IObservableAsync<T>, IObserverAsync<T>
{
IObservableAsync<T> Values { get; }
}
ConcurrentSubjectAsync¶
Forwards notifications to observers concurrently.
public sealed class ConcurrentSubjectAsync<T> : BaseSubjectAsync<T>
{
// Observers notified in parallel for high throughput
}
ConcurrentReplayLatestSubjectAsync¶
Replays latest value to new subscribers with concurrent notification.
ConcurrentStatelessSubjectAsync¶
Stateless subject with concurrent notification.
ConcurrentStatelessReplayLatestSubjectAsync¶
Stateless replay-latest subject with concurrent notification.
Multicasting Operators¶
// Publish with serial subject
public static ConnectableObservableAsync<T> Publish<T>(
this IObservableAsync<T> source)
// Publish with stateless subject
public static ConnectableObservableAsync<T> StatelessPublish<T>(
this IObservableAsync<T> source)
// Replay latest value
public static ConnectableObservableAsync<T> ReplayLatest<T>(
this IObservableAsync<T> source)
// Auto-connect on first subscriber
public static IObservableAsync<T> RefCount<T>(
this ConnectableObservableAsync<T> source)
// Custom subject factory
public static ConnectableObservableAsync<T> Multicast<T, TSubject>(
this IObservableAsync<T> source,
Func<TSubject> subjectFactory)
where TSubject : ISubjectAsync<T>
Example:
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100)).Take(5);
// Publish + explicit connect
var published = source.Publish();
await using var sub1 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub1: {v}"), CancellationToken.None);
await using var sub2 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub2: {v}"), CancellationToken.None);
await using var connection = await published.ConnectAsync(CancellationToken.None);
// RefCount: auto-connect/disconnect
var shared = source.Publish().RefCount();
// ReplayLatest: new subscribers get most recent value
var replayed = source.ReplayLatest().RefCount();
14. Bridging Classic and Async Observables¶
ObservableBridgeExtensions¶
Provides bidirectional conversion.
// IObservable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IObservable<T> observable)
// IObservableAsync to IObservable
public static IObservable<T> ToObservable<T>(
this IObservableAsync<T> asyncObservable)
// IAsyncEnumerable to IObservableAsync
public static IObservableAsync<T> ToObservableAsync<T>(
this IAsyncEnumerable<T> asyncEnumerable)
// IObservableAsync to IAsyncEnumerable
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IObservableAsync<T> asyncObservable)
Example:
// Bridge from classic IObservable
var classic = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(5);
var asyncVersion = classic.ToObservableAsync();
// Bridge back to classic
var backToClassic = asyncVersion.ToObservable();
// From IAsyncEnumerable
async IAsyncEnumerable<int> GenerateAsync()
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(50);
yield return i;
}
}
var fromAsyncEnum = GenerateAsync().ToObservableAsync();
15. Classic Reactive Extensions Operators¶
ReactiveUI.Extensions also provides valuable operators for traditional IObservable<T> that don't ship with System.Reactive.
Null & Signal Helpers¶
// Filter nulls
public static IObservable<T> WhereIsNotNull<T>(
this IObservable<T?> source)
where T : class
// Convert to Unit signal
public static IObservable<Unit> AsSignal<T>(
this IObservable<T> source)
Timing & Scheduling¶
// Shared timer (one underlying timer per TimeSpan)
public static IObservable<long> SyncTimer(TimeSpan period)
// Schedule single value
public static IObservable<T> Schedule<T>(
this T value,
TimeSpan dueTime,
IScheduler scheduler)
// Safe scheduling (handles null scheduler)
public static IDisposable ScheduleSafe(
this IScheduler? scheduler,
Action action)
// ThrottleFirst: allow first item per window
public static IObservable<T> ThrottleFirst<T>(
this IObservable<T> source,
TimeSpan window,
IScheduler? scheduler = null)
// ThrottleDistinct: throttle but only emit on change
public static IObservable<T> ThrottleDistinct<T>(
this IObservable<T> source,
TimeSpan throttle,
IScheduler? scheduler = null)
// DebounceImmediate: emit first immediately then debounce
public static IObservable<T> DebounceImmediate<T>(
this IObservable<T> source,
TimeSpan dueTime,
IScheduler? scheduler = null)
Inactivity / Liveness¶
// Heartbeat during quiet periods
public static IObservable<IHeartbeat<T>> Heartbeat<T>(
this IObservable<T> source,
TimeSpan interval,
IScheduler? scheduler = null)
// Detect stale data
public static IObservable<IStale<T>> DetectStale<T>(
this IObservable<T> source,
TimeSpan staleThreshold,
IScheduler? scheduler = null)
// Buffer until inactive
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source,
TimeSpan inactivityPeriod,
IScheduler? scheduler = null)
Error Handling¶
// Ignore all errors
public static IObservable<T> CatchIgnore<T>(
this IObservable<T> source)
// Return fallback on error
public static IObservable<T> CatchAndReturn<T>(
this IObservable<T> source,
T fallback)
// Retry with error handler
public static IObservable<T> OnErrorRetry<T, TException>(
this IObservable<T> source,
Action<TException> onError,
int retryCount = int.MaxValue,
TimeSpan delay = default,
IScheduler? delayScheduler = null)
where TException : Exception
// Retry with exponential backoff
public static IObservable<T> RetryWithBackoff<T>(
this IObservable<T> source,
int maxRetries,
TimeSpan initialDelay,
double backoffFactor = 2.0,
TimeSpan? maxDelay = null,
IScheduler? scheduler = null)
Combining & Aggregation¶
// All values are true
public static IObservable<bool> CombineLatestValuesAreAllTrue(
this IEnumerable<IObservable<bool>> sources)
// All values are false
public static IObservable<bool> CombineLatestValuesAreAllFalse(
this IEnumerable<IObservable<bool>> sources)
// Get max value
public static IObservable<T> GetMax<T>(
this IObservable<T> source)
// Get min value
public static IObservable<T> GetMin<T>(
this IObservable<T> source)
// Partition into two streams
public static (IObservable<T> True, IObservable<T> False) Partition<T>(
this IObservable<T> source,
Func<T, bool> predicate)
Logical / Boolean¶
// Boolean negation
public static IObservable<bool> Not(
this IObservable<bool> source)
// Filter true values
public static IObservable<bool> WhereTrue(
this IObservable<bool> source)
// Filter false values
public static IObservable<bool> WhereFalse(
this IObservable<bool> source)
Async / Task Integration¶
// Sequential async projection
public static IObservable<TResult> SelectAsyncSequential<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> selector)
// Latest only (cancels previous)
public static IObservable<TResult> SelectLatestAsync<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> selector)
// Limited parallelism
public static IObservable<TResult> SelectAsyncConcurrent<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> selector,
int maxConcurrency)
// Async subscription
public static IDisposable SubscribeAsync<T>(
this IObservable<T> source,
Func<T, Task> onNext)
// Synchronous gate
public static IDisposable SubscribeSynchronous<T>(
this IObservable<T> source,
Func<T, Task> onNext)
// Convert to hot Task
public static Task<T> ToHotTask<T>(
this IObservable<T> source)
Backpressure¶
// Conflate bursty updates
public static IObservable<T> Conflate<T>(
this IObservable<T> source,
TimeSpan minimumPeriod,
IScheduler? scheduler = null)
Filtering / Conditional¶
// Filter strings by regex
public static IObservable<string> Filter(
this IObservable<string> source,
string regexPattern)
// Take until predicate (inclusive)
public static IObservable<T> TakeUntil<T>(
this IObservable<T> source,
Func<T, bool> predicate)
// Wait for first match
public static IObservable<T> WaitUntil<T>(
this IObservable<T> source,
Func<T, bool> predicate)
// Sample latest on trigger
public static IObservable<T> SampleLatest<T>(
this IObservable<T> source,
IObservable<Unit> trigger)
// Fallback if empty
public static IObservable<T> SwitchIfEmpty<T>(
this IObservable<T> source,
IObservable<T> fallback)
// Drop if busy
public static IObservable<T> DropIfBusy<T>(
this IObservable<T> source,
Func<T, Task> asyncAction)
Buffering & Transformation¶
// Buffer until delimiter
public static IObservable<IList<T>> BufferUntil<T>(
this IObservable<T> source,
T startDelimiter,
T endDelimiter)
// Buffer until idle
public static IObservable<IList<T>> BufferUntilIdle<T>(
this IObservable<T> source,
TimeSpan idlePeriod)
// Emit consecutive pairs
public static IObservable<(T Previous, T Current)> Pairwise<T>(
this IObservable<T> source)
// Scan with initial value
public static IObservable<TAccumulate> ScanWithInitial<TSource, TAccumulate>(
this IObservable<TSource> source,
TAccumulate initial,
Func<TAccumulate, TSource, TAccumulate> accumulator)
// Shuffle arrays in-place
public static IObservable<T[]> Shuffle<T>(
this IObservable<T[]> source)
Subscription / Side Effects¶
// Action on subscribe
public static IObservable<T> DoOnSubscribe<T>(
this IObservable<T> source,
Action action)
// Action on dispose
public static IObservable<T> DoOnDispose<T>(
this IObservable<T> source,
Action disposeAction)
Utility & Miscellaneous¶
// ForEach with low allocations
public static IObservable<T> ForEach<T>(
this IObservable<IEnumerable<T>> source)
// Using helper
public static IObservable<TResult> Using<TDisposable, TResult>(
this TDisposable obj,
Func<TDisposable, TResult> function,
IScheduler? scheduler = null)
where TDisposable : IDisposable
// While loop
public static IObservable<Unit> While(
Func<bool> condition,
Action action,
IScheduler? scheduler = null)
// OnNext with params
public static void OnNext<T>(
this IObserver<T> observer,
params T[] values)
// Read-only BehaviorSubject
public static (IObservable<T> Observable, IObserver<T> Observer) ToReadOnlyBehavior<T>(
T initialValue)
// Property change observable
public static IObservable<TProperty> ToPropertyObservable<TSource, TProperty>(
this TSource source,
Expression<Func<TSource, TProperty>> propertyExpression)
where TSource : INotifyPropertyChanged
16. Advanced Multi-Threading Examples¶
Example 1: Parallel Data Processing Pipeline¶
using ReactiveUI.Extensions.Async;
using ReactiveUI.Extensions.Async.Subjects;
public class ParallelDataProcessor
{
private readonly int _maxConcurrency;
public ParallelDataProcessor(int maxConcurrency = 4)
{
_maxConcurrency = maxConcurrency;
}
public async Task ProcessDataStreamAsync(
IAsyncEnumerable<string> input,
Func<string, CancellationToken, Task<string>> processor,
CancellationToken cancellationToken)
{
// Convert to async observable
var observable = input.ToObservableAsync();
// Process with limited concurrency
var processed = observable
.SelectMany(async (item, ct) =>
{
try
{
return await processor(item, ct);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing {item}: {ex.Message}");
return null;
}
})
.Where(x => x != null);
// Subscribe and process results
await processed.ForEachAsync(async (result, ct) =>
{
await SaveResultAsync(result!, ct);
}, cancellationToken);
}
private async Task SaveResultAsync(string result, CancellationToken ct)
{
// Simulate async I/O
await Task.Delay(10, ct);
Console.WriteLine($"Saved: {result}");
}
}
// Usage
var processor = new ParallelDataProcessor(maxConcurrency: 4);
var data = GenerateDataAsync(); // IAsyncEnumerable<string>
await processor.ProcessDataStreamAsync(
data,
async (item, ct) =>
{
await Task.Delay(100, ct); // Simulate processing
return item.ToUpperInvariant();
},
CancellationToken.None);
Example 2: Real-Time Data Aggregation with Multiple Sources¶
public class RealTimeAggregator
{
public async Task AggregateMultipleSourcesAsync(
CancellationToken cancellationToken)
{
// Create multiple data sources
var source1 = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100))
.Take(50)
.Select(x => $"Source1-{x}");
var source2 = ObservableAsync.Interval(TimeSpan.FromMilliseconds(150))
.Take(50)
.Select(x => $"Source2-{x}");
var source3 = ObservableAsync.Interval(TimeSpan.FromMilliseconds(200))
.Take(50)
.Select(x => $"Source3-{x}");
// Merge all sources with concurrency limit
var merged = ObservableAsync.Merge(
new[] { source1, source2, source3 },
maxConcurrency: 3);
// Buffer and process in batches
var batches = merged
.Buffer(TimeSpan.FromMilliseconds(500))
.Where(batch => batch.Count > 0);
// Process batches in parallel
await batches.ForEachAsync(async (batch, ct) =>
{
Console.WriteLine($"Processing batch of {batch.Count} items");
// Process batch items in parallel
var tasks = batch.Select(async item =>
{
await Task.Delay(10, ct);
return item.ToUpperInvariant();
});
var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
Console.WriteLine($" {result}");
}
}, cancellationToken);
}
}
Example 3: Cancellation-Cooperative Long-Running Operation¶
public class CancellableDataFetcher
{
public async Task FetchDataWithCancellationAsync(
string url,
CancellationToken cancellationToken)
{
var fetchObservable = ObservableAsync.Create<string>(async (observer, ct) =>
{
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, cancellationToken);
try
{
for (int i = 0; i < 100 && !linkedCts.Token.IsCancellationRequested; i++)
{
// Simulate async HTTP request
var data = await FetchPageAsync(url, i, linkedCts.Token);
await observer.OnNextAsync(data, linkedCts.Token);
await Task.Delay(100, linkedCts.Token);
}
await observer.OnCompletedAsync(Result.Success);
}
catch (OperationCanceledException)
{
await observer.OnCompletedAsync(Result.Success);
}
catch (Exception ex)
{
await observer.OnErrorResumeAsync(ex, linkedCts.Token);
}
finally
{
linkedCts.Dispose();
}
return DisposableAsync.Empty;
});
// Subscribe with cancellation
await using var subscription = await fetchObservable.SubscribeAsync(
async (data, ct) =>
{
await ProcessDataAsync(data, ct);
},
cancellationToken);
}
private async Task<string> FetchPageAsync(string url, int page, CancellationToken ct)
{
await Task.Delay(50, ct); // Simulate network
return $"Page {page} data";
}
private async Task ProcessDataAsync(string data, CancellationToken ct)
{
await Task.Delay(10, ct); // Simulate processing
Console.WriteLine($"Processed: {data}");
}
}
Example 4: Thread-Safe State Management with Async Observables¶
public class ThreadSafeStateManager<T>
{
private readonly ConcurrentSubjectAsync<T> _subject = new();
private T? _currentState;
private readonly SemaphoreSlim _lock = new(1, 1);
public IObservableAsync<T> State => _subject;
public async ValueTask UpdateStateAsync(T newState, CancellationToken cancellationToken)
{
await _lock.WaitAsync(cancellationToken);
try
{
_currentState = newState;
await _subject.OnNextAsync(newState, cancellationToken);
}
finally
{
_lock.Release();
}
}
public async ValueTask<T> GetStateAsync(CancellationToken cancellationToken)
{
await _lock.WaitAsync(cancellationToken);
try
{
return _currentState!;
}
finally
{
_lock.Release();
}
}
public async ValueTask DisposeAsync()
{
await _lock.WaitAsync();
try
{
await _subject.OnCompletedAsync(Result.Success);
await _subject.DisposeAsync();
}
finally
{
_lock.Release();
_lock.Dispose();
}
}
}
// Usage
var stateManager = new ThreadSafeStateManager<int>();
// Subscribe from multiple threads
var tasks = Enumerable.Range(0, 5).Select(async i =>
{
await using var sub = await stateManager.State.SubscribeAsync(
async (state, ct) =>
{
Console.WriteLine($"Thread {i}: State = {state}");
},
CancellationToken.None);
await Task.Delay(1000);
});
await Task.WhenAll(tasks);
// Update state
await stateManager.UpdateStateAsync(42, CancellationToken.None);
Example 5: Backpressure Handling with Conflation¶
public class BackpressureHandler
{
public async Task HandleHighFrequencyDataAsync(
CancellationToken cancellationToken)
{
// High-frequency source (1000 events/second)
var highFrequency = ObservableAsync.Interval(TimeSpan.FromMilliseconds(1))
.Take(10000);
// Conflate to 100 events/second (keep latest)
var conflated = highFrequency
.Publish(shared => shared
.Throttle(TimeSpan.FromMilliseconds(10))
.Merge(shared.TakeLast(1)));
// Process at manageable rate
await conflated.ForEachAsync(async (value, ct) =>
{
await Task.Delay(5, ct); // Simulate processing
Console.WriteLine($"Processed: {value}");
}, cancellationToken);
}
}
17. Performance Considerations¶
ValueTask vs Task¶
ReactiveUI.Extensions uses ValueTask throughout for better performance:
// ValueTask avoids heap allocation for synchronous completions
public ValueTask OnNextAsync(T value, CancellationToken cancellationToken)
Benefits:
- No heap allocation when operation completes synchronously
- Reduced GC pressure
- Better performance for hot paths
Allocation-Aware Operators¶
Many operators are designed to minimize allocations:
// ForEach with low allocations
public static IObservable<T> ForEach<T>(
this IObservable<IEnumerable<T>> source)
Threading and Concurrency¶
Best Practices:
- 1. Use ConcurrentSubjectAsync for high throughput:
var subject = new ConcurrentSubjectAsync<string>();
// Observers notified in parallel
- 2. Limit concurrency with Merge:
var merged = sources.Merge(maxConcurrency: 4);
- 3. Use ObserveOn for context switching:
var onUiThread = source.ObserveOn(SynchronizationContext.Current!);
Memory Management¶
Dispose Properly:
// Always use await using for async disposables
await using var subscription = await observable.SubscribeAsync(...);
// Or use CompositeDisposableAsync for multiple resources
var composite = new CompositeDisposableAsync();
await composite.AddAsync(subscription1);
await composite.AddAsync(subscription2);
await composite.DisposeAsync();
18. Best Practices¶
1. Always Pass CancellationToken¶
// GOOD
await observable.ForEachAsync(async (item, ct) =>
{
await ProcessAsync(item, ct);
}, cancellationToken);
// BAD - No cancellation support
await observable.ForEachAsync(async item =>
{
await ProcessAsync(item, CancellationToken.None);
});
2. Use Async Disposables Properly¶
// GOOD
await using var subscription = await observable.SubscribeAsync(...);
// BAD - Synchronous disposal of async resources
using (await observable.SubscribeAsync(...)) { }
3. Handle Errors in OnErrorResumeAsync¶
public class ResilientObserver : ObserverAsync<T>
{
protected override async ValueTask OnErrorResumeAsyncCore(
Exception error,
CancellationToken cancellationToken)
{
// Log error
await LogErrorAsync(error, cancellationToken);
// Decide whether to continue or stop
// Don't rethrow unless you want to terminate
}
}
4. Use Appropriate Subjects¶
// For simple multicasting
var subject = SubjectAsync.Create<T>();
// For high-throughput scenarios
var subject = new ConcurrentSubjectAsync<T>();
// For replaying latest value
var subject = SubjectAsync.CreateReplayLatest<T>();
5. Limit Concurrency¶
// GOOD - Limited concurrency
var processed = source.SelectMany(
async item => await ProcessAsync(item),
maxConcurrency: 4);
// BAD - Unlimited concurrency
var processed = source.SelectMany(
async item => await ProcessAsync(item));
6. Use ObserveOn for UI Updates¶
// Ensure UI updates happen on UI thread
var uiUpdates = source.ObserveOn(SynchronizationContext.Current!)
.SubscribeAsync(async (value, ct) =>
{
UpdateUI(value);
});
7. Bridge Carefully Between Classic and Async¶
// Convert classic to async when needed
var asyncObservable = classicObservable.ToObservableAsync();
// Convert back when interoperating with Rx.NET libraries
var classic = asyncObservable.ToObservable();
8. Test with TestScheduler¶
// Use TestScheduler for deterministic testing
var testScheduler = new TestScheduler();
var source = ObservableAsync.Interval(TimeSpan.FromSeconds(1), testScheduler);
// Advance time deterministically
testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
Conclusion¶
ReactiveUI.Extensions represents a significant advancement in reactive programming for .NET, providing a fully async-native framework that addresses the limitations of traditional Rx.NET in modern async/await scenarios. By embracing ValueTask, CancellationToken, and IAsyncDisposable throughout the entire pipeline, it enables:
- True end-to-end async without thread blocking
- Proper cancellation support at every level
- Async resource cleanup for I/O-bound operations
- Seamless interop with both classic Rx.NET and modern async patterns
- High performance through allocation-aware design
The library's comprehensive operator set, combined with advanced features like concurrent subjects, async disposables, and bidirectional bridging, makes it an essential tool for building robust, scalable reactive applications in .NET 8 and beyond.
Whether you're building real-time data processing pipelines, handling high-frequency event streams, or simply need better async integration in your reactive code, ReactiveUI.Extensions provides the tools and patterns to succeed.
Additional Resources¶
- GitHub Repository: https://github.com/reactiveui/Extensions
- System.Reactive Documentation: https://github.com/dotnet/reactive
- ReactiveUI Documentation: https://www.reactiveui.net/docs/
- Introduction to Rx.NET eBook: Free 2nd Edition available
This article covers all public functions within the ReactiveUI.Extensions library as of version 2.2.x. For the most up-to-date information, please refer to the official GitHub repository.