Problem
Part of your solution uses Rx observables, and part of your solution uses dataflow meshes, and you need them to communicate.
Rx observables and dataflow meshes each have their own uses, with some conceptual overlap; this recipe shows how easily they work together so you can use the best tool for each part of the job.
Solution
First, let’s consider using a dataflow block as an input to an observable stream. The following code creates a buffer block (which does no processing) and creates an ob‐
servable interface from that block by calling AsObservable:
var buffer = new BufferBlock<int>();
IObservable<int> integers = buffer.AsObservable();
integers.Subscribe(data => Trace.WriteLine(data), ex => Trace.WriteLine(ex),
() => Trace.WriteLine("Done"));
buffer.Post(13);
Buffer blocks and observable streams can be completed normally or with error, and the AsObservable method will translate the block completion (or fault) into the completion of the observable stream. However, if the block faults with an exception, that exception will be wrapped in an AggregateException when it is passed to the observable stream.
This is similar to how linked blocks propagate their faults.
It is only a little more complicated to take a mesh and treat it as a destination for an observable stream. The following code calls AsObserver to allow a block to subscribe to an observable stream:
IObservable<DateTimeOffset> ticks =
Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp()
.Select(x => x.Timestamp) .Take(5);
var display = new ActionBlock<DateTimeOffset>(x => Trace.WriteLine(x));
ticks.Subscribe(display.AsObserver());
try {
display.Completion.Wait();
Trace.WriteLine("Done.");
}
90 | Chapter 7: Interop
catch (Exception ex) {
Trace.WriteLine(ex);
}
Just as before, the completion of the observable stream is translated to the completion of the block, and any errors from the observable stream are translated to a fault of the block.
Discussion
Dataflow blocks and observable streams share a lot of conceptual ground. They both have data pass through them, and they both understand completion and faults. They were designed for different scenarios; TPL Dataflow is intended for a mixture of asyn‐
chronous and parallel programming, while Rx is intended for reactive programming.
However, the conceptual overlap is compatible enough that they work very well and naturally together.
See Also
Recipe 7.5 covers consuming observable streams with asynchronous code.
Recipe 7.6 covers using asynchronous code within an observable stream.
7.7. Rx Observables and Dataflow Meshes | 91
CHAPTER 8
Collections
Using the proper collections is essential in concurrent applications. I’m not talking about the standard collections like List<T>; I assume you already know about those. The purpose of this chapter is to introduce newer collections that are specifically intended for concurrent or asynchronous use.
Immutable collections are collection instances that can never change. At first glance, this sounds completely useless; but they’re actually very useful even in single-threaded, nonconcurrent applications. Read-only operations (such as enumeration) act directly on the immutable instance. Write operations (such as adding an item) return a new immutable instance instead of changing the existing instance. This is not as wasteful as it first sounds because most of the time immutable collections share most of their memory. Furthermore, immutable collections have the advantage of being implicitly safe to access from multiple threads; since they cannot change, they are threadsafe.
Immutable collections are in the Microsoft.Bcl.Immutable NuGet package.
At the time of this writing, immutable collections are new, but they should be considered for all new development unless you need a mutable instance. If you’re not familiar with immutable collections, I recommend that you start with Recipe 8.1, even if you don’t need a stack or queue, because I’ll cover several common patterns that all immutable collections follow.
If you need to construct an immutable collection with lots of existing elements, there are special ways to do this efficiently; the example code in these recipes only add elements one at a time. The MSDN documentation has details on how to efficiently construct
93
immutable collections if you need to speed up your initialization. Table 8-1 details the platform availability of immutable collections.
Table 8-1. Immutable collections platform availability Platform ImmutableStack<T>, etc.
.NET 4.5 .NET 4.0 Mono iOS/Droid Windows Store Windows Phone Apps 8.1 Windows Phone SL 8.0 Windows Phone SL 7.1 Silverlight 5
Threadsafe collections are mutable collection instances that can be changed by multiple threads simultaneously. Threadsafe collections use a mixture of fine-grained locks and lock-free techniques to ensure that threads are blocked for a minimal amount of time (and usually are not blocked at all). For many threadsafe collections, enumeration of the collection actually creates a snapshot of the collection and then enumerates that snapshot. The key advantage of threadsafe collections is that they can be accessed safely from multiple threads, yet the operations will only block your code for a short time, if at all. Table 8-2 details the platform availability of threadsafe collections.
Table 8-2. Threadsafe collections platform availability Platform ConcurrentDictionary<TKey, TValue>, etc.
.NET 4.5 .NET 4.0 Mono iOS/Droid Windows Store Windows Phone Apps 8.1 Windows Phone SL 8.0 Windows Phone SL 7.1 Silverlight 5
Producer/consumer collections are mutable collection instances that are designed with a specific purpose in mind: to allow (possibly multiple) producers to push items to the collection while allowing (possibly multiple) consumers to pull items out of the collec‐
tion. So they act as a bridge between producer code and consumer code, and also have an option to limit the number of items in the collection. Producer/consumer collections can either have a blocking or asynchronous API. For example, when the collection is
94 | Chapter 8: Collections
empty, a blocking producer/consumer collection will block the calling consumer thread until another item is added; but an asynchronous producer/consumer collection will allow the calling consumer thread to asynchronously wait until another item is added.
Table 8-3 details the platform availability of producer/consumer collections.
AsyncProducerConsumerQueue<T> and AsyncCollection<T> are in the Nito.AsyncEx NuGet package. BufferBlock<T> is in the Micro soft.Tpl.Dataflow NuGet package.
Table 8-3. Producer/consumer collections platform availability
Platform BlockingCollection<T> BufferBlock<T> AsyncProducerConsumerQueue<T> AsyncCollection<T>
.NET 4.5 .NET 4.0 Mono iOS/
Droid Windows Store Windows Phone Apps 8.1 Windows Phone SL 8.0 Windows Phone SL 7.1 Silverlight 5
There are a number of different producer/consumer collections used in the recipes in this chapter, and different producer/consumer collections have different advantages.
Table 8-4 may be helpful in determining which one you should use.
Table 8-4. Producer/consumer collections
Feature BlockingCollection<T> BufferBlock<T> AsyncProducerConsumerQueue<T> AsyncCollection<T>
Queue semantics Stack/bag semantics Synchronous API Asynchronous API
Collections | 95