What is Rx?
Rx can be summarized in the following
sentence which can also be read on the Data Developer Center homepage:
Rx is a library
for composing asynchronous and event-based programs using observable
collections.
Three core properties are reflected in
here, all of which will be addressed throughout this lab:
·
Asynchronous and event-based – As
reflected in the title, the bread and butter of Rx’s mission statement is to
simplify those programming models. Everyone knows what stuck user interfaces
look like, both on the Windows platform and on the web. And with the cloud
around the corner, asynchrony becomes quintessential. Low-level technologies
like .NET events, the asynchronous pattern, tasks, AJAX, etc. are often too
hard.
·
Composition – Combining asynchronous
computations today is way too hard. It involves a lot of plumbing code that has
little to nothing to do with the problem being solved. In particular, the data
flow of the operations involved in the problem is not clear at all, and code
gets spread out throughout event handlers, asynchronous callback procedures,
and whatnot.
·
Observable collections – By looking at
asynchronous computations as data sources, we can leverage the active knowledge
of LINQ’s programming model. That’s right: your mouse is a database of mouse
moves and clicks. In the world of Rx, such asynchronous data sources are
composed using various operators in the LINQ sense, allowing things like
filters, projections, joins, time-based operations, etc.
Lab flow
In this lab, we’ll explore Rx in a gradual
manner. First, we’ll have a look at the core
interfaces of Rx which ship in .NET 4’s Base Class Library. Once we
understand those interfaces (and their relationship with IEnumerable),
we’ll move on to show how the Rx libraries allow for creating simple observable sequences using
factory methods. This allows for some basic experimentation. As we proceed,
we’ll introduce how to bridge with
existing asynchronous event sources such as .NET events and the
asynchronous pattern. Showing more query
operators as we move along, we’ll end up at a point where we start to compose multiple asynchronous sources,
unleashing the true power of Rx.
Exercise 1 – Getting started with Rx interfaces and
assemblies
Objective: Acquiring basic knowledge of
the IObservable and IObserver interfaces that ship in the
.NET 4 BCL (Base Class Library) and the role of the Rx release System.Reactive
assembliy.
1.
Open Visual Studio 2010 and go
to File, New, Project… to create a new Console Application project in C#. Make
sure the .NET Framework 4 target is set in the dropdown box at the top of the
dialog.
2.
In the Program.Main method in
the Program.cs source file, explore the System namespace by typing the
following fragment of code:
System.IObs
This shows the two core interfaces around which Rx is built. Starting from .NET 4, those interfaces are built in to the Base Class Library’s mscorlib.dll assembly.
Note: On other platforms supported by Rx (including .NET 3.5 SP1 and Silverlight), a separate assembly called System.Observable.dll has to be included in the project in order to get access to those two interfaces. This assembly gets installed in the same location as the other assemblies we’ll talk about further on in this first exercise.
System.IObs
This shows the two core interfaces around which Rx is built. Starting from .NET 4, those interfaces are built in to the Base Class Library’s mscorlib.dll assembly.
Note: On other platforms supported by Rx (including .NET 3.5 SP1 and Silverlight), a separate assembly called System.Observable.dll has to be included in the project in order to get access to those two interfaces. This assembly gets installed in the same location as the other assemblies we’ll talk about further on in this first exercise.
3.
To start our exploration of those interfaces, we
should have a look at them. Use the Object Browser (available through the View
menu) or use both interfaces in a piece of code and use Go to Definition from
the context menu (or press F12). We’ll follow the latter route here as it
allows to indicate the role of both interfaces in an informal manner using variable
names:
IObservable<int> source;
IObserver<int> handler;
The interfaces should look as shown below:
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver observer);
}
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
Both interfaces serve a complementary role. The IObservable interface acts as a data source that can be observed, meaning it can send data to
everyone who’s interested to hear about it. Those interested parties are
represented by the IObserver interface.
In order to receive notifications from an observable collection, one uses the Subscribe method to hand it an IObserver object. In return for this
observer, the Subscribe method returns an IDisposable object that acts as a
handle for the subscription. Calling Dispose on this object will detach the
observer from the source such that notifications are no longer delivered.
Similarities with the += and -= operators used for .NET events are not
accidental, but the Subscribe method provides more flexibility. In particular,
an unsubscribe action can result in quite some bookkeeping, all of which is
handled by the Rx framework.
Observers support three notifications, reflected by the interface’s methods. OnNext can be called zero or more times, when the observable data source has data available. For example, an observable data source used for mouse move events could send out a Point object every time the mouse has moved. The other two methods are used to signal successful or exceptional termination.
Background: Those two interfaces are the dual to IEnumerable and
IEnumerator . While this deep duality may sound frightening, it’s a
source of much beauty. First of all, the property of duality between those
interfaces can be explained in two simple English words: push versus pull. Where an observable data source pushes data at
its observers, enumerable data sources are being pulled by an enumerator to
receive data (typically using the foreach language construct). As a result of
this duality, all of the LINQ operators that apply in one world have a
corresponding use in the other world. We’ll see this illustrated in later
exercises.
IObservable<int> source;
IObserver<int> handler;
The interfaces should look as shown below:
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver
}
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
Both interfaces serve a complementary role. The IObservable
In order to receive notifications from an observable collection, one uses the Subscribe method to hand it an IObserver
Observers support three notifications, reflected by the interface’s methods. OnNext can be called zero or more times, when the observable data source has data available. For example, an observable data source used for mouse move events could send out a Point object every time the mouse has moved. The other two methods are used to signal successful or exceptional termination.
Background: Those two interfaces are the dual to IEnumerable
4.
Even though we’ve shown those
interfaces, we’ll resist the temptation to implement them. Instead, the next
exercise will explain how to create observable sequences using functionality in
the Rx library. To recap the usage, we’ll start by writing a bit of
non-functional code (indeed, this won’t work just yet):
IDisposable subscription = source.Subscribe(handler);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
IDisposable subscription = source.Subscribe(handler);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
5.
While the above shows a typical
use pattern of observable sequences, you may have noticed there’s little one
can do with an IObservable object, other than subscribing to it:
6.
In order to make observable sequences more useful,
the Rx library provides a whole bunch of operators to apply operations to
observable sequences and to compose them. So, while the .NET 4 BCL ships bare
bones interfaces, Rx provides rich functionality for them. In order to leverage
those, go to the Solution Explorer, right click on the project’s node, choose
Add Reference… In there, you should find System.Reactive under the
Assemblies\Extensions menu or by using the Search Assemblies options.
Add the System.Reactive assembly reference to your project. In this lab, we won’t cover the System.Reactive.Providers, System.Reactive.Windows.Forms and System.Reactive.Windows.Threading assemblies.
Note: Your actual version number for the Rx assembly may vary from those shown in the screenshot above. As long as you got a later version than the one shown, you should be fine to go.
Tips: If you can’t find the assemblies shown in the screenshot above, first wait a couple of seconds since the new Visual Studio 2010 dialog has been made asynchronous with regards to reference folder scanning. Also notice this doesn’t retain an alphabetical ordering, so you may have to trigger a sort on the first column. If the assemblies still don’t show up, check Rx is installed by checking “Add or remove programs” in the Control Panel.
Add the System.Reactive assembly reference to your project. In this lab, we won’t cover the System.Reactive.Providers, System.Reactive.Windows.Forms and System.Reactive.Windows.Threading assemblies.
Note: Your actual version number for the Rx assembly may vary from those shown in the screenshot above. As long as you got a later version than the one shown, you should be fine to go.
Tips: If you can’t find the assemblies shown in the screenshot above, first wait a couple of seconds since the new Visual Studio 2010 dialog has been made asynchronous with regards to reference folder scanning. Also notice this doesn’t retain an alphabetical ordering, so you may have to trigger a sort on the first column. If the assemblies still don’t show up, check Rx is installed by checking “Add or remove programs” in the Control Panel.
7.
The System.Reactive assembly is what contains the
operators for observable sequences, implemented as extension methods as we
shall see in just a moment. Since many of those operators need to introduce
concurrency (a necessary evil in the face of asynchronous programming), the
notion of a scheduler was
introduced. To achieve architectural layering, target specific scheduler
functionality for Windows Forms and WPF resides in target specific assemblies:
System.Reactive.Windows.Forms and System.Reactive.Windows.Threading. Later on,
we’ll see operators like ObserveOn that use schedulers.
8.
With the assembly reference
added, we should now see more methods on IObservable objects. Since
those are extension methods, a little experiment will reveal two buckets of
additional methods. Eliminate all of the namespace imports other than System
and observe the IntelliSense auto-completion list on IObservable:
Notice the Subscribe extension methods being added through the System namespace. Those overloads allow one to avoid implementing the IObserver interface at all, since one can specify any of the three
handler methods (OnNext, OnError, OnCompleted) using delegates. For example:
IDisposable subscription = source.Subscribe(
(int x) => {
Console.WriteLine("Received {0} from source.", x);
},
(Exception ex) => {
Console.WriteLine("Source signaled an error: {0}.", ex.Message);
},
() => {
Console.WriteLine("Source said there are no messages to follow anymore.");
}
);
Exercise: It’s left as an exercise for the reader to eliminate any excessive syntax in the sample above, such as types than can be inferred, redundant curly braces, etc.
Notice the Subscribe extension methods being added through the System namespace. Those overloads allow one to avoid implementing the IObserver
(int x) => {
Console.WriteLine("Received {0} from source.", x);
},
(Exception ex) => {
Console.WriteLine("Source signaled an error: {0}.", ex.Message);
},
() => {
Console.WriteLine("Source said there are no messages to follow anymore.");
}
);
Exercise: It’s left as an exercise for the reader to eliminate any excessive syntax in the sample above, such as types than can be inferred, redundant curly braces, etc.
9.
To see operators that can be
applied to IObservable objects, add a using directive to import the System.Reactive.Linq namespace. “Dot into” our source object again, this time revealing
a whole bunch of operators. For those familiar with LINQ, try finding some of
your favorite operators such as Where, Select, etc.
Conclusion: The
IObservable and IObserver interfaces represent a data source
and a listener, respectively. In order to observe an observable sequence’s
notifications, one gives it an observer object using the Subscribe method,
receiving an IDisposable object that be used to unsubscribe. While those interfaces
are in the .NET 4 BCL, they’re only available through a System.Observable
assembly on other platforms. To enable rich functionality over observable
sequences (as we’ll discuss thoroughly in what follows), System.Reactive
provides a series of extension methods that can be imported through the System
and System.Reactive.Linq namespaces.
Exercise 2 – Creating observable sequences
Objective: Observable
sequences are rarely provided by implementing the IObservable
interface directly. Instead a whole set of factory methods exist that create primitive
observable sequences. Those factory methods provide a good means for initial
exploration of the core notions of observable sources and observers.
1.
Ensure the project setup of
Exercise 1 is still intact, i.e. the reference to System.Reactive is in place
and both the System and System.Reactive.Linq namespaces are imported in your
Program.cs file. Also ensure the following skeleton code is still present:
IObservable<int> source = /* We’ll explore a set of factory methods here */;
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
);
IObservable<int> source = /* We’ll explore a set of factory methods here */;
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
);
2.
In the above, we’ll substitute
the comment for various primitive sources and observe their behavior. We’ll
also contrast those with enumerable sequences. Since observable sequences
sometimes introduce concurrency to pump out their notifications (required for
asynchrony), we should prevent the program from quitting while the subscription
is active. We’ll hold the main thread by using Console.ReadLine to do so prior
to calling Dispose on the subscription:
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
3.
We’ll start by looking at the Empty factory method:
IObservable<int> source = Observable.Empty<int>();
Running this code produces the following output:
OnCompleted
In order words, the empty sequence simply signals completion to its observers by calling OnCompleted. This is very similar to LINQ to Object’s Enumerable.Empty or an empty array (e.g. new int[0]). For those enumerable sequences, the first call to the enumerator’s MoveNext method will return false, signaling completion.
Background: One may wonder when observable sequences start running. In particular, what’s triggering the empty sequence to fire out the OnCompleted message to its observers? The answer differs from sequence to sequence. Most of the sequences we’re looking at in this exercise are so-called cold observables which means they start running upon subscription. This is different from hot observables such as mouse move events which are flowing even before a subscription is active (there’s no way to keep the mouse from moving after all…).
IObservable<int> source = Observable.Empty<int>();
Running this code produces the following output:
OnCompleted
In order words, the empty sequence simply signals completion to its observers by calling OnCompleted. This is very similar to LINQ to Object’s Enumerable.Empty or an empty array (e.g. new int[0]). For those enumerable sequences, the first call to the enumerator’s MoveNext method will return false, signaling completion.
Background: One may wonder when observable sequences start running. In particular, what’s triggering the empty sequence to fire out the OnCompleted message to its observers? The answer differs from sequence to sequence. Most of the sequences we’re looking at in this exercise are so-called cold observables which means they start running upon subscription. This is different from hot observables such as mouse move events which are flowing even before a subscription is active (there’s no way to keep the mouse from moving after all…).
4.
Besides the OnCompleted message, OnError is also a
terminating notification, in a sense no messages can follow it. Where Empty is
the factory method creating an observable sequence that immediately triggers
completion, the Throw method creates an observable sequence that immediately
triggers an OnError message to observers:
IObservable<int> source = Observable.Throw<int>(new Exception("Oops"));
Running this code produces the following output:
OnError: Oops
Background: The OnError message is typically used by an observable sequence (not as trivial as the one simply returned by a call to Throw) to signal an error state which could either originate from a failed computation or the delivery thereof. Following the semantic model of the CLR’s exception mechanism, errors in Rx are always terminating and exhibit a fail-fast characteristic, surfacing errors through observer handlers. More advanced mechanisms to deal with errors exist in the form of handlers called Catch, OnErrorResumeNext and Finally. We won’t discuss those during this HOL, but their role should be self-explanatory based on corresponding language constructs in various managed languages.
IObservable<int> source = Observable.Throw<int>(new Exception("Oops"));
Running this code produces the following output:
OnError: Oops
Background: The OnError message is typically used by an observable sequence (not as trivial as the one simply returned by a call to Throw) to signal an error state which could either originate from a failed computation or the delivery thereof. Following the semantic model of the CLR’s exception mechanism, errors in Rx are always terminating and exhibit a fail-fast characteristic, surfacing errors through observer handlers. More advanced mechanisms to deal with errors exist in the form of handlers called Catch, OnErrorResumeNext and Finally. We won’t discuss those during this HOL, but their role should be self-explanatory based on corresponding language constructs in various managed languages.
5.
One final essential factory method or primitive
constructor is called Return. Its role is to represent a single-element
sequence, just like a single-cell array would be in the world of IEnumerable
sequences. The behavior observed by subscribed observers is two messages:
OnNext with the value and OnCompleted signaling the end of the sequence has
been received:
IObservable<int> source = Observable.Return(42);
Running this code produces the following output:
OnNext: 42
OnCompleted
Background: Return plays an essential role in the theory behind LINQ, known as monads. Together with an operator called SelectMany (which we’ll learn about more later on), they form the primitive functions needed to leverage the power of monadic computation. More information can be found by searching the web using monad and functional as the keywords.
IObservable<int> source = Observable.Return(42);
Running this code produces the following output:
OnNext: 42
OnCompleted
Background: Return plays an essential role in the theory behind LINQ, known as monads. Together with an operator called SelectMany (which we’ll learn about more later on), they form the primitive functions needed to leverage the power of monadic computation. More information can be found by searching the web using monad and functional as the keywords.
6.
At this point, we’ve seen the most trivial
observable sequence constructors that are intimately related to an observer’s
triplet of methods. While those are of interest in certain cases, more meaty
sequences are worth to explore as well. The Range operator is just one operator
that generates such a sequence. Symmetric to the same operator on Enumerable,
Range does return a sequence with 32-bit integer values given a starting value
and a length:
IObservable<int> source = Observable.Range(5, 3);
Running this code produces the following output:
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
Note: As with all the sequences mentioned in this exercise, Range is a cold observable. To recap, this simply means that it starts producing its results to an observer upon subscription. Another property of cold observable sequence is that every subscription will cause such reevaluation. Thus, if two calls to Subscribe are made, both of the observers passed to those calls will receive the messages from the observable. It’s not because the data observation has run to completion for one observer that other observers won’t run anymore. Whether or not the produced data is the same for every observer depends on the characteristics of the sequence that’s being generated. For deterministic and “purist functional” operators like Return and Range, the messages delivered to every observer will be the same. However, one could imagine other kinds of observable sequences that depend on side-effects and thus deliver different results for every observer that subscribes to them.
IObservable<int> source = Observable.Range(5, 3);
Running this code produces the following output:
OnNext: 5
OnNext: 6
OnNext: 7
OnCompleted
Note: As with all the sequences mentioned in this exercise, Range is a cold observable. To recap, this simply means that it starts producing its results to an observer upon subscription. Another property of cold observable sequence is that every subscription will cause such reevaluation. Thus, if two calls to Subscribe are made, both of the observers passed to those calls will receive the messages from the observable. It’s not because the data observation has run to completion for one observer that other observers won’t run anymore. Whether or not the produced data is the same for every observer depends on the characteristics of the sequence that’s being generated. For deterministic and “purist functional” operators like Return and Range, the messages delivered to every observer will be the same. However, one could imagine other kinds of observable sequences that depend on side-effects and thus deliver different results for every observer that subscribes to them.
7.
To generalize the notion of sequence creation in
terms of a generator function, the Generate constructor function was added to
Rx. It closely resembles the structure of a for-loop as one would use to
generate an enumerable sequence using C# iterators (cf. the “yield return” and
“yield break” statements). To do so, it takes a number of delegate-typed
parameters that expect function to check for termination, to iterate one step
and to emit a result that becomes part of the sequence and is sent to the
observer:
IObservable<int> source = Observable.Generate(0, i => i < 5, i => i + 1, i => i * i);
Running this code produces the following output:
OnNext: 0
OnNext: 1
OnNext: 4
OnNext: 9
OnNext: 16
OnCompleted
Note: A sister function called GenerateWithTime exists that waits a computed amount of time before moving on to the next iteration. We’ll look at this one in just a moment.
IObservable<int> source = Observable.Generate(0, i => i < 5, i => i + 1, i => i * i);
Running this code produces the following output:
OnNext: 0
OnNext: 1
OnNext: 4
OnNext: 9
OnNext: 16
OnCompleted
Note: A sister function called GenerateWithTime exists that waits a computed amount of time before moving on to the next iteration. We’ll look at this one in just a moment.
8.
One operator that may seem interesting from a
curiosity point of view only is called Never. It creates an observable sequence
that will never signal any notification to a subscribed observer:
IObservable<int> source = Observable.Never<int>();
Running this code shouldn’t produce any output till the end of mankind.
Background: One reason this operator has a role is to reason about composition with other sequences, e.g. what would it mean to concatenate a finite and an infinite sequence? Should the result also exhibit an infinite characteristic? Those answers are essential to ensuring all of the Rx semantics make sense and are consistent throughout various operators. Besides the operator’s theoretical use, there are real use cases for it as well. For example, you may use it to ensure a sequence never terminates by avoiding an OnCompleted handler getting triggered. Another use is to test whether an application does not hang in the presence of non-termination.
IObservable<int> source = Observable.Never<int>();
Running this code shouldn’t produce any output till the end of mankind.
Background: One reason this operator has a role is to reason about composition with other sequences, e.g. what would it mean to concatenate a finite and an infinite sequence? Should the result also exhibit an infinite characteristic? Those answers are essential to ensuring all of the Rx semantics make sense and are consistent throughout various operators. Besides the operator’s theoretical use, there are real use cases for it as well. For example, you may use it to ensure a sequence never terminates by avoiding an OnCompleted handler getting triggered. Another use is to test whether an application does not hang in the presence of non-termination.
9.
Where the Subscribe method has
an asynchronous characteristic, other blocking operators exist to observe a
sequence in a synchronous manner. One such operator is called ForEach.
First of all, let’s formalize “asynchronous”. It’s near to impossible to say something is asynchronous in itself without mentioning two parties. In particular, the thread calling the Subscribe method on an observable sequence is not blocked till the sequence runs to completion, which may happen on another thread. That is, Subscribe is asynchronous in that the caller is not blocked till the observation of the sequence completes.
For demos and testing it’s often useful to perform a “blocking subscribe” such that the caller is blocked till the observable sequence triggers OnError or OnCompleted to the observer. That’s what ForEach is all about:
IObservable<int> source = Observable.Range(0, 10);
source.ForEach(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
);
In here, we won’t get to the next statement till the sequence completes gracefully or exceptionally.
First of all, let’s formalize “asynchronous”. It’s near to impossible to say something is asynchronous in itself without mentioning two parties. In particular, the thread calling the Subscribe method on an observable sequence is not blocked till the sequence runs to completion, which may happen on another thread. That is, Subscribe is asynchronous in that the caller is not blocked till the observation of the sequence completes.
For demos and testing it’s often useful to perform a “blocking subscribe” such that the caller is blocked till the observable sequence triggers OnError or OnCompleted to the observer. That’s what ForEach is all about:
IObservable<int> source = Observable.Range(0, 10);
source.ForEach(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
);
In here, we won’t get to the next statement till the sequence completes gracefully or exceptionally.
10.
Finally, let’s inspect the behavior of an observable
sequence by looking at it through the lenses of the debugger in Visual Studio.
We’ll use the following fragment to do so:
IObservable<int> source = Observable.Generate(
0, i => i < 5,
i => i + 1,
i => i * i,
i => TimeSpan.FromSeconds(i)
);
using (source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
))
{
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
}
This sample uses a slightly different operator called Generate that allows specifying iteration time between producing results, dependent on the loop variable. In this case, 0 will be produced upon subscription, followed by 1 a second later, then 4 two seconds later, 9 three seconds later and 16 four seconds later. Notice how the notion of time – all important in an asynchronous world – is entering the picture here.
Note: Typically you won’t immediately dispose a subscription by means of a using block. In this sample case it works fine since the code block blocks for user input. In most cases you’ll store the IDisposable object in order to dispose it at a later time, or even don’t bother to dispose it (e.g. for infinite sequences like timers).
IObservable<int> source = Observable.Generate(
0, i => i < 5,
i => i + 1,
i => i * i,
i => TimeSpan.FromSeconds(i)
);
using (source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
))
{
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
}
This sample uses a slightly different operator called Generate that allows specifying iteration time between producing results, dependent on the loop variable. In this case, 0 will be produced upon subscription, followed by 1 a second later, then 4 two seconds later, 9 three seconds later and 16 four seconds later. Notice how the notion of time – all important in an asynchronous world – is entering the picture here.
Note: Typically you won’t immediately dispose a subscription by means of a using block. In this sample case it works fine since the code block blocks for user input. In most cases you’ll store the IDisposable object in order to dispose it at a later time, or even don’t bother to dispose it (e.g. for infinite sequences like timers).
a.
Set a breakpoint on the
highlighted lambda expression body using F9. Notice you need to be inside the
lambda body with the cursor in the editor in order for the breakpoint to be set
on the body and not the outer method call to Subscribe.
b.
Start running the program by
pressing F10 and step down in the code using F10. You’ll not see the breakpoint
being hit just yet and will effectively reach the Console.ReadLine call:
c.
What’s happening here is that
the call to Subscribe started a background thread to pump out the observable
sequence’s values based on a timer. We’ll learn more about the concurrency
aspects of Rx later on, but for now let’s verify this hypothesis by looking at
the Threads window in the debugger (Debug, Windows, Threads or CTRL+D,T):
Note: To see call stack frames for System.CoreEx and System.Reactive you have to disable the “Just My Code” feature. Alternatively, switch to the Call Stack window, right-click and select Show External Code.
What can be seen from the above is the worker thread is clearly driven by a System.Threading timer running code in the ThreadPoolScheduler from System.Concurrency. Rx always uses primitives in this namespace to create concurrency when it needs to do so.
Note: To see call stack frames for System.CoreEx and System.Reactive you have to disable the “Just My Code” feature. Alternatively, switch to the Call Stack window, right-click and select Show External Code.
What can be seen from the above is the worker thread is clearly driven by a System.Threading timer running code in the ThreadPoolScheduler from System.Concurrency. Rx always uses primitives in this namespace to create concurrency when it needs to do so.
d.
Now hit F5 to continue execution. At this time, we
end up hitting our breakpoint which runs on the background thread as can be
seen from the debugger:
Notice the main thread is still in Console.ReadLine (the gray adorner reveals a different thread) while our call stack reflects the timer-based background thread pumping out an OnNext message for x = 1.
Notice the main thread is still in Console.ReadLine (the gray adorner reveals a different thread) while our call stack reflects the timer-based background thread pumping out an OnNext message for x = 1.
e.
The reader should feel free to
hit F5 a couple more times to see the breakpoint getting hit for every
subsequent OnNext message flowing out of the observable sequence. Setting a
breakpoint on the OnCompleted will show the same multi-threaded behavior for
the Generate sequence:
Whether or not a sequence pumps out messages to subscribed observers on a background thread depends on a number of factors. We won’t detail this aspect at this point and will encounter the use of synchronization primitives in Exercise 3. Suffice to say that the developer can control this aspect by manually specifying an IScheduler object if ever needed.
Whether or not a sequence pumps out messages to subscribed observers on a background thread depends on a number of factors. We won’t detail this aspect at this point and will encounter the use of synchronization primitives in Exercise 3. Suffice to say that the developer can control this aspect by manually specifying an IScheduler object if ever needed.
Conclusion: Creating observable sequences does not require
manual implementation of the IObservable interface, nor does the use
of Subscribe require an IObserver implementation. For the former, a
series of operators exist that create sequences with zero, one or more
elements. For the latter, Subscribe extension methods exist that take various
combinations of OnNext, OnError and OnCompleted handlers in terms of delegates.
Exercise 3 – Importing .NET events into Rx
Objective: Creating observable
sequences out of nowhere using various factory “constructor” methods
encountered in the previous exercise is one thing. Being able to bridge with
existing sources of asynchrony in the framework is even more important. In this
exercise we’ll look at the FromEventPattern operator that allows “importing” a
.NET event into Rx as an observable collection. Every time the event is raised,
an OnNext message will be delivered to the observable sequence.
Background: Rx doesn’t aim at replacing existing
asynchronous programming models such as .NET events, the asynchronous pattern
or the Task Parallel Library. Those concepts or frameworks are often perfectly
suited for direct use, e.g. using event handlers in C#. However, once
composition enters the picture, using those low-level concepts tends to be a grueling
experience where one has to deal with resource maintenance (e.g. when to
unsubscribe) and often has to reinvent the wheel (e.g. how do you “filter” an
event?). Needless to say, all of this can be very error prone and distracts the
developer from the real problem being solved. In this sample and the ones that
follow, we’ll show where the power of Rx comes in: composition of asynchronous
data sources.
1.
Windows Forms is a good sample
of a framework API that’s full of events. To reduce noise and be in absolute
control, we won’t create a Windows Forms project but will start from a new
Console Application project. Once it’s been created, add a reference to
System.Windows.Forms and System.Drawing as well as System.Reactive:
2.
Enter the following code to
create a new form and start running it by calling Application.Run:
using System;
using System.Reactive.Linq;
using System.Windows.Forms;
class Program
{
static void Main()
{
var frm = new Form();
Application.Run(frm);
}
}
Running the code above should simply show a form and cause the program to quit upon closing the form.
using System;
using System.Reactive.Linq;
using System.Windows.Forms;
class Program
{
static void Main()
{
var frm = new Form();
Application.Run(frm);
}
}
Running the code above should simply show a form and cause the program to quit upon closing the form.
3.
In order to contrast the world
of Rx from classic .NET events, let’s start by showing the well-understood way
to deal with the latter. For our running sample, we’ll be dealing with the
MouseMove event:
var lbl = new Label();
var frm = new Form {
Controls = { lbl }
};
frm.MouseMove += (sender, args) => {
lbl.Text = args.Location.ToString(); // This has become a position-tracking label.
};
Application.Run(frm);
// We’re sloppy and “forget” to detach the handler... This is harder than you may
// expect due to the use of a lambda expression (see point 4 below).
While this works great, there are a number of limitations associated with classic .NET events:
var lbl = new Label();
var frm = new Form {
Controls = { lbl }
};
frm.MouseMove += (sender, args) => {
lbl.Text = args.Location.ToString(); // This has become a position-tracking label.
};
Application.Run(frm);
// We’re sloppy and “forget” to detach the handler... This is harder than you may
// expect due to the use of a lambda expression (see point 4 below).
While this works great, there are a number of limitations associated with classic .NET events:
·
Events are hidden data sources.
It requires looking at the handler’s code to see this. Did you ever regard the
MouseMove event as a collection of Point values? In the world of Rx, we see
events as just another concrete form of observable sequences: your mouse is a
database of Point values!
·
Events cannot be handed out,
e.g. an event producing Point values cannot be handed out to a GPS service. The
deeper reason for this is that events are not first-class objects. In the world
of Rx, each observable sequence is represented using an object that can be
passed around or stored.
·
Events cannot be composed
easily. For example, you can’t hire a mathematician to write a generic filter
operator that will filter an event’s produced data based on a certain
criterion. In the world of Rx, due to the first-class object nature of
observables, we can provide generic operators like Where.
·
Events require manual handler
maintenance which requires you to remember the delegate that was handed to it.
It’s like keeping your hands on the money you paid for your newspaper
subscription in order to be able to unsubscribe. In the world of Rx, you get an
IDisposable handle to unsubscribe.
4.
Now let’s see how things look
when using Rx. To import an event into Rx, we use the FromEventPattern
operator, which we tell the EventArgs objects that will be raised by the event
being bridged. Two overloads exist, one that takes a pair of functions used to
attach and detach a handler, and one that uses reflection to find those
add/remove methods on your behalf. We’ll go for the latter approach here:
var lbl = new Label();
var frm = new Form {
Controls = { lbl }
};
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
using (moves.Subscribe(evt => {
lbl.Text = evt.EventArgs.Location.ToString();
}))
{
Application.Run(frm);
}
// Proper clean-up just got a lot easier...
We’ll now explain this fragment in depth to drive home the points we made before:
var lbl = new Label();
var frm = new Form {
Controls = { lbl }
};
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
using (moves.Subscribe(evt => {
lbl.Text = evt.EventArgs.Location.ToString();
}))
{
Application.Run(frm);
}
// Proper clean-up just got a lot easier...
We’ll now explain this fragment in depth to drive home the points we made before:
·
The FromEventPattern operator turns
the given event in an observable sequence with an EventPattern element
type that captures both the sender and the event arguments. Hovering over the
var keyword reveals the type inferred for the moves local variable:
When calling Subscribe, a handler is attached to the underlying event. For every time the event gets raised, the sender and arguments are stowed away in an EventPattern object that’s sent to all of the
observable’s subscribers.
When calling Subscribe, a handler is attached to the underlying event. For every time the event gets raised, the sender and arguments are stowed away in an EventPattern
·
Inside our OnNext handler, due
to the strong typing provided by generics, we can “dot into” the event
arguments’ Location property. We’ll see later how we can leverage built-in
operators to shake off the need to traverse this object graph in the handler.
·
Clean-up of the event handler
is taken care of by the IDisposable object returned by the FromEventPattern
operator. Calling Dispose – here achieved by reaching the end of the
using-block – will automatically get rid of the underlying event handler.
5.
To master the technique a bit further, let’s have a
look at another Windows Forms event. First add a TextBox control to the form,
which we can conveniently do using C# 3.0 object initializer syntax:
var txt = new TextBox();
var frm = new Form
{
Controls = { txt }
};
Note: Alternatively, feel free to use the proper Windows Forms designer to build the UI and set up observable sequences from the Form_Load event.
var txt = new TextBox();
var frm = new Form
{
Controls = { txt }
};
Note: Alternatively, feel free to use the proper Windows Forms designer to build the UI and set up observable sequences from the Form_Load event.
6.
Restructure the code to look as
follows in order to have both the Form’s MouseMove and the TextBox’s
TextChanged event. This time, we’ll print to the console as a means to do
logging. In Exercise 5 we’ll learn about a specialized operator (called Do)
that can be used for this purpose.
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
var input = Observable.FromEventPattern(txt, "TextChanged");
var movesSubscription = moves.Subscribe(evt =>
{
Console.WriteLine("Mouse at: " + evt.EventArgs.Location);
});
var inputSubscription = input.Subscribe(evt =>
{
Console.WriteLine("User wrote: " + ((TextBox)evt.Sender).Text);
});
using (new CompositeDisposable(movesSubscription, inputSubscription))
{
Application.Run(frm);
}
At this point it may seem we haven’t gained too much yet. Things are just “different”. What really matters though is that we’ve put us in the world of IObservable sequences, over which a lot of operators are defined that
we’ll talk about in a moment.
For one thing, notice all the hoops one has to go through in order to get the text out of a TextChanged event. As mentioned before, classic .NET events don’t explicitly exhibit a data-oriented nature. This particular event is a great example of this observation: from the event handler of a TextChanged event one doesn’t immediately get the text after the change has occurred, even though that’s what 99% of uses of the event are about (the other 1% may be justified by “state invalidation handling”, e.g. to enable “Do you want to save changes?” behavior).
Finally, notice the way we can group the IDisposable subscription handlers together using one of the types in the System.Reactive.Disposables namespace. CompositeDisposable simply creates an IDisposable object that will dispose all of its contained IDisposable objects upon calling Dispose. We’ll omit further exploration of this namespace in this hands-on lab.
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
var input = Observable.FromEventPattern(txt, "TextChanged");
var movesSubscription = moves.Subscribe(evt =>
{
Console.WriteLine("Mouse at: " + evt.EventArgs.Location);
});
var inputSubscription = input.Subscribe(evt =>
{
Console.WriteLine("User wrote: " + ((TextBox)evt.Sender).Text);
});
using (new CompositeDisposable(movesSubscription, inputSubscription))
{
Application.Run(frm);
}
At this point it may seem we haven’t gained too much yet. Things are just “different”. What really matters though is that we’ve put us in the world of IObservable
For one thing, notice all the hoops one has to go through in order to get the text out of a TextChanged event. As mentioned before, classic .NET events don’t explicitly exhibit a data-oriented nature. This particular event is a great example of this observation: from the event handler of a TextChanged event one doesn’t immediately get the text after the change has occurred, even though that’s what 99% of uses of the event are about (the other 1% may be justified by “state invalidation handling”, e.g. to enable “Do you want to save changes?” behavior).
Finally, notice the way we can group the IDisposable subscription handlers together using one of the types in the System.Reactive.Disposables namespace. CompositeDisposable simply creates an IDisposable object that will dispose all of its contained IDisposable objects upon calling Dispose. We’ll omit further exploration of this namespace in this hands-on lab.
7.
A fragment of sample output is
shown below:
Conclusion: .NET events are just one
form of asynchronous data sources. In order to use them as observable
collections, Rx provides the FromEventPattern operator. In return one gets
EventPattern objects containing the sender and event arguments.
Exercise 4 – A first look at some Standard Query Operators
Objective: Looking at observable
sequences as asynchronous data sources is what enables them to be queried, just
like any other data source. Who says querying in the context of C# programming
nowadays, immediately thinks LINQ. In this exercise we’ll show how to use the
LINQ syntax to write queries over observable collections.
1.
Continuing with the previous
exercise’s code, let’s have a look back at the code we wrote to handle various
UI-related events brought into Rx using the FromEventPattern operator:
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
var input = Observable.FromEventPattern(txt, "TextChanged");
var movesSubscription = moves.Subscribe(evt =>
{
Console.WriteLine("Mouse at: " + evt.EventArgs.Location);
});
var inputSubscription = input.Subscribe(evt =>
{
Console.WriteLine("User wrote: " + ((TextBox)evt.Sender).Text);
});
Recall the type of the moves and input collections, both of which are IObservable> objects, where TEventArgs is
obtained from the generic parameter on FromEvent. Quite often we’re not
interested in all of the information an IEvent captures and want to “shake off”
redundant stuff.
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
var input = Observable.FromEventPattern(txt, "TextChanged");
var movesSubscription = moves.Subscribe(evt =>
{
Console.WriteLine("Mouse at: " + evt.EventArgs.Location);
});
var inputSubscription = input.Subscribe(evt =>
{
Console.WriteLine("User wrote: " + ((TextBox)evt.Sender).Text);
});
Recall the type of the moves and input collections, both of which are IObservable
2.
In a classic .NET event world –
and in any other programming model for asynchronous data sources before the
advent of Rx for that matter – such data-intensive operations often led to
imperative code. For events, many of you will likely have written code like
this:
private void frm_MouseMove(object sender, MouseEventArgs e)
{
Point position = e.Location;
if (position.X == position.Y)
{
// Only want to respond to events for mouse moves
// over the first bisector of the form.
}
}
private void txt_TextChanged(object sender, EventArgs e)
{
string text = ((TextBox)sender).Text;
// And now we can forget about sender and e parameters...
}
What we’ve really done here is mimicking data-intensive “sequence operators” in an imperative way. The first sample shows a filter using an if-statement; the second one embodies a projection using another local variable. In doing so, we’ve lost an important property though. The parts omitted by green comments no longer directly operate on an event but are lost in a sea of imperative code. In other words, it’s not possible to filter an event and get another event back.
private void frm_MouseMove(object sender, MouseEventArgs e)
{
Point position = e.Location;
if (position.X == position.Y)
{
// Only want to respond to events for mouse moves
// over the first bisector of the form.
}
}
private void txt_TextChanged(object sender, EventArgs e)
{
string text = ((TextBox)sender).Text;
// And now we can forget about sender and e parameters...
}
What we’ve really done here is mimicking data-intensive “sequence operators” in an imperative way. The first sample shows a filter using an if-statement; the second one embodies a projection using another local variable. In doing so, we’ve lost an important property though. The parts omitted by green comments no longer directly operate on an event but are lost in a sea of imperative code. In other words, it’s not possible to filter an event and get another event back.
3.
In the brave new Rx world, we
can do better than this. Since observable sequences have gotten a first-class
status by representing them as IObservable objects, we can provide operators over them by providing a whole
set of (generic extension) methods.
By ensuring some of those methods have the right signature, LINQ syntax works
out of the box. Let’s have a look at how we’d revamp the imperative event
handler code above using those query operators over observable sequences. First,
let’s turn the event-based input sequences into what we wish they’d look like:
var
moves = from evt in
Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove")
select evt.EventArgs.Location;
var input = from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text;
var movesSubscription = moves.Subscribe(pos => Console.WriteLine("Mouse at: " + pos));
var inputSubscription = input.Subscribe(inp => Console.WriteLine("User wrote: " + inp));
Using a query expression in C#, we project away the EventPattern
and EventPattern data type in favor of a Point and a string,
respectively. As a result both the moves and input sequences now are observable
sequences of a meaningful data type that just captures what we need:
The query expression syntax is simply shorthand syntax for query operator methods. The above corresponds to the following equivalent code:
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove")
.Select(evt => evt.EventArgs.Location);
var input = Observable.FromEventPattern(txt, "TextChanged")
.Select(evt => ((TextBox)evt.Sender).Text);
var movesSubscription = moves.Subscribe(pos => Console.WriteLine("Mouse at: " + pos));
var inputSubscription = input.Subscribe(inp => Console.WriteLine("User wrote: " + inp));
Background: The ability to represent asynchronous data sources as first-class objects is what enables operators like this to be defined. Being able to produce an observable sequence that operates based on input of one or more sequences is not just interesting from a data point of view. Equally important is the ability to control the lifetime of subscriptions. Consider someone subscribes to, say, the input sequence. What really happens here is that the Select operator’s result sequence gets a request to subscribe an observer. On its turn, it propagates this request to its source, which happens to be a sequence produced by FromEventPattern. Ultimately, the event-wrapping source sequence hooks up an event handler. Disposing a subscription is propagated in a similar manner.
select evt.EventArgs.Location;
var input = from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text;
var movesSubscription = moves.Subscribe(pos => Console.WriteLine("Mouse at: " + pos));
var inputSubscription = input.Subscribe(inp => Console.WriteLine("User wrote: " + inp));
Using a query expression in C#, we project away the EventPattern
The query expression syntax is simply shorthand syntax for query operator methods. The above corresponds to the following equivalent code:
var moves = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove")
.Select(evt => evt.EventArgs.Location);
var input = Observable.FromEventPattern(txt, "TextChanged")
.Select(evt => ((TextBox)evt.Sender).Text);
var movesSubscription = moves.Subscribe(pos => Console.WriteLine("Mouse at: " + pos));
var inputSubscription = input.Subscribe(inp => Console.WriteLine("User wrote: " + inp));
Background: The ability to represent asynchronous data sources as first-class objects is what enables operators like this to be defined. Being able to produce an observable sequence that operates based on input of one or more sequences is not just interesting from a data point of view. Equally important is the ability to control the lifetime of subscriptions. Consider someone subscribes to, say, the input sequence. What really happens here is that the Select operator’s result sequence gets a request to subscribe an observer. On its turn, it propagates this request to its source, which happens to be a sequence produced by FromEventPattern. Ultimately, the event-wrapping source sequence hooks up an event handler. Disposing a subscription is propagated in a similar manner.
4.
With the projections in place to reduce the noise on
input sequences, we can now easily filter the mouse moves to those over the
first bisector (where x and y coordinates are equal). How do you perform a
filter in LINQ? Use the where keyword:
var overFirstBisector = from pos in moves
where pos.X == pos.Y
select pos;
var movesSubscription = overFirstBisector.Subscribe(pos =>
Console.WriteLine("Mouse at: " + pos));
The type for both moves and overFirstBisector will be IObservable.
var overFirstBisector = from pos in moves
where pos.X == pos.Y
select pos;
var movesSubscription = overFirstBisector.Subscribe(pos =>
Console.WriteLine("Mouse at: " + pos));
The type for both moves and overFirstBisector will be IObservable
5.
A sample of the output is shown
below. All of the emitted mouse move messages satisfy the filter constraint we
specified in the query:
Conclusion: The first-class nature of
observable sequences as IObservable objects is what enables us to
provide generic operators (sometimes referred to as operators) to be defined
over them. The majority of those operators produce another observable sequence.
This allows continuous “chaining” of operators to manipulate an asynchronous
data source’s emitted results till the application’s requirements are met. LINQ
syntax provides a convenient way to carry out some of those common operators.
Others will be discussed further on.
Exercise 5 – More query operators to tame the user’s input
Objective: Observable
sequences are often not well-behaved for the intended usage. We may get data
presented in one form but really want it in another shape. For this, simple
projection can be used as shown in the previous exercise. But there are far
more cases of ill-behaved data sources. For example, duplicate values may come
out. But there’s more beyond the perspective of observable sequences as “just
data sources”. In particular, asynchronous data sources have an intrinsic
notion of timing. What if a source goes too fast for consumers to deal with
their data? We’ll learn how to deal with those situations in this exercise.
From this exercise on, we’ll be floating on
a common theme of the typical “dictionary suggest” sample for asynchronous
programming. The idea is to let the user type a term in a textbox and show all
the words that start with the term, by consulting a web service. To keep the UI
from freezing, we got to do this kind of stuff in an asynchronous manner. Rx is
a great fit for this kind of composition. But first things first, let’s see how
our form’s TextBox control is behaving.
1.
In what follows, we won’t need
the MouseMove event anymore, so let’s stick with just a single TextBox control
and its (projected) TextChanged event:
var input = from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text;
using (input.Subscribe(inp => Console.WriteLine("User wrote: " + inp)))
{
Application.Run(frm);
}
Note: UI designer fanatics should feel free to start from a clean slate with a Windows Forms (or WPF) designer and put code equivalent to the above in a classic (which is perfectly fine for this purpose) Load event handler. In such a scenario, you’ll likely store the subscription IDisposable object in a field to dispose it when the form or window is unloaded or closed.
var input = from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text;
using (input.Subscribe(inp => Console.WriteLine("User wrote: " + inp)))
{
Application.Run(frm);
}
Note: UI designer fanatics should feel free to start from a clean slate with a Windows Forms (or WPF) designer and put code equivalent to the above in a classic (which is perfectly fine for this purpose) Load event handler. In such a scenario, you’ll likely store the subscription IDisposable object in a field to dispose it when the form or window is unloaded or closed.
2.
Now, let’s carry out a few experiments. Start the
application and type “reactive” (without the quotes) in the textbox. Obviously
you should see no less than eight events being handled through the observer.
However, notice what happens if you select a single letter in the word and
overwrite it by the same letter:
r e a c t|i v e à (SHIFT-LEFT ARROW) r e a c t i v e à (type t) r e a c t|i v e
The screenshot below shows the corresponding output. What’s this duplicate message at the end about? Didn’t we ask for TextChanged events? Yes, but it turns out UI frameworks do not keep track of the last value entered by the user and may raise false positives based on internal state changes.
Notice the same “issue” appears when pasting the same text over the entire selection of a TextBox (e.g. CTRL-A, CTRL-C, CTRL-V to exhibit the quirk).
r e a c t|i v e à (SHIFT-LEFT ARROW) r e a c t i v e à (type t) r e a c t|i v e
The screenshot below shows the corresponding output. What’s this duplicate message at the end about? Didn’t we ask for TextChanged events? Yes, but it turns out UI frameworks do not keep track of the last value entered by the user and may raise false positives based on internal state changes.
Notice the same “issue” appears when pasting the same text over the entire selection of a TextBox (e.g. CTRL-A, CTRL-C, CTRL-V to exhibit the quirk).
3.
Assume for a moment we take the
user input and feed it to a dictionary suggest web service (as we will later
on) which charges the user or application vendor for every request made to the
service. Do you really want to pay twice the price to lookup “reactive” because
of some weird behavior in the UI framework? Likely not.
So how would you solve the issue in an Rx-free world? Well, you’d keep some state somewhere to keep track of the last value seen and only propagate the input through in case it differs from the previous input. All of this clutters the code significantly with things like a private field, an if-statement and additional assignment in the event handler, etc. But worse, all the logic goes in an event handler which lacks composition: at no point we have an asynchronous data source, free of duplicates, we can put our hands on (e.g. to hand it to another component in the system).
In Rx, thanks to the power of composition, we get away with a single operator call that does all the comparison and state maintenance on our behalf. This operator is called DistinctUntilChanged:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.DistinctUntilChanged();
using (input.Subscribe(inp => Console.WriteLine("User wrote: " + inp)))
{
Application.Run(frm);
}
Overloads exist that accept an IEqualityComparer object to carry out the check for equality between
the current and previous elements.
With this fix in place, runs of identical values will only cause the first such value to be propagated. If the value received from the source is different the very first value or different from the previous one, it goes out at that very moment. If it’s the same as the previous value, it’s muted and attached observers don’t get to see it.
Background: It’s essential to understand
how data flows through a “network” of interconnected observable sequences. In
the sample above, there are three sequences in the mix. First, there’s FromEventPattern
that listens on a classic .NET event and emits its values to subscribed
observers. Next, the Select operator takes care of carrying out a projection by
receiving values, transforming them and sending them out. Finally,
DistinctUntilChanged receives output from Select, filters out consecutive
duplicates and propagates the results to its observer. The figure below
illustrates how a subscription is set up and how data flows through the
operators.
Each operator is a little black box that knows how to propagate subscriptions to its source sequence(s), as well as how to take the data it receives and transform it to send it along (if desired). All of this works nice till the point you see some data coming out in the observer and wonder where it comes from. To figure that out, a handy operator is called Do, which allows to log the data that’s flowing through a “wire”:
This is the Rx form of “printf debugging”. The Do-operator has several overloads that are similar to Subscribe’s and Run’s. Either you give it an observer to monitor the data that’s been propagated down through the operator or you can give it a set of handlers for OnNext, OnError and OnCompleted. Below is a sample of the operator’s use to see DistinctUntilChanged filtering out the duplicate values it receives:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Do(inp => Console.WriteLine("Before DistinctUntilChanged: " + inp))
.DistinctUntilChanged();
using (input.Subscribe(inp => Console.WriteLine("User wrote: " + inp)))
{
Application.Run(frm);
}
With this in place, the output looks as follows. Here we produced the quirky duplicate input four times and yet the “User wrote” message only appears for the first such input.
Note: Feel free to attach Do probes to other operators as well. When using query expression syntax, you’ll have to revert to regular method calls (such as Where, Select) in order to insert the probing Do operator call.
So how would you solve the issue in an Rx-free world? Well, you’d keep some state somewhere to keep track of the last value seen and only propagate the input through in case it differs from the previous input. All of this clutters the code significantly with things like a private field, an if-statement and additional assignment in the event handler, etc. But worse, all the logic goes in an event handler which lacks composition: at no point we have an asynchronous data source, free of duplicates, we can put our hands on (e.g. to hand it to another component in the system).
In Rx, thanks to the power of composition, we get away with a single operator call that does all the comparison and state maintenance on our behalf. This operator is called DistinctUntilChanged:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.DistinctUntilChanged();
using (input.Subscribe(inp => Console.WriteLine("User wrote: " + inp)))
{
Application.Run(frm);
}
Overloads exist that accept an IEqualityComparer
With this fix in place, runs of identical values will only cause the first such value to be propagated. If the value received from the source is different the very first value or different from the previous one, it goes out at that very moment. If it’s the same as the previous value, it’s muted and attached observers don’t get to see it.
Each operator is a little black box that knows how to propagate subscriptions to its source sequence(s), as well as how to take the data it receives and transform it to send it along (if desired). All of this works nice till the point you see some data coming out in the observer and wonder where it comes from. To figure that out, a handy operator is called Do, which allows to log the data that’s flowing through a “wire”:
This is the Rx form of “printf debugging”. The Do-operator has several overloads that are similar to Subscribe’s and Run’s. Either you give it an observer to monitor the data that’s been propagated down through the operator or you can give it a set of handlers for OnNext, OnError and OnCompleted. Below is a sample of the operator’s use to see DistinctUntilChanged filtering out the duplicate values it receives:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Do(inp => Console.WriteLine("Before DistinctUntilChanged: " + inp))
.DistinctUntilChanged();
using (input.Subscribe(inp => Console.WriteLine("User wrote: " + inp)))
{
Application.Run(frm);
}
With this in place, the output looks as follows. Here we produced the quirky duplicate input four times and yet the “User wrote” message only appears for the first such input.
Note: Feel free to attach Do probes to other operators as well. When using query expression syntax, you’ll have to revert to regular method calls (such as Where, Select) in order to insert the probing Do operator call.
4.
Back to our running sample, there’s yet another
problem with the user’s input. Since we’ll ultimately feed it to a web service
(which may be, as we said before, “pay for play” on a per-request basis), it’s
unlikely we want to send requests for every substring the user wrote while
entering input. Stated otherwise, we need to protect the web service against
fast typists.
Rx has an operator that can be used to “calm down” an observable sequence, called Throttle:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1)) // Exercise: what operator order is better?
.DistinctUntilChanged();
The way this works is a timer is used to let an incoming message age for the specified duration, after which it can be propagated further on. If during this timeframe another message comes in, the original message gets dropped on the floor and substituted for the new one that effectively also resets the timer. For our sample, if the user types “reactive” without hiccups (i.e. no two consecutive changes are further apart than 1 second), no intermediate substrings will be propagated. When the user stops typing (after hitting ‘e’, causing the last changed event higher up), it takes one second before the input “reactive” is propagated down. Later one we’ll feed this entire sequence to a web service which now cannot be called excessively due to a typist gone loose.
To illustrate the operator’s effect, let’s use the Do operator in conjunction with two specialized projection operators called Timestamp and Select. The former takes an IObservable and turns it into an
IObservable>, where the latter does the opposite.
Those operators simply add or remove a timestamp at the point a message is
received. This allows us to visualize timing information:
var
input = (from evt in
Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Timestamp()
.Do(inp => Console.WriteLine( "I: " + inp.Timestamp.Millisecond
+ " - " + inp.Value))
.Select(x => x.Value)
.Throttle(TimeSpan.FromSeconds(1))
.Timestamp()
.Do(inp => Console.WriteLine( "T: " + inp.Timestamp.Millisecond
+ " - " + inp.Value))
.Select(x => x.Value)
.DistinctUntilChanged();
Note: We need to remove and reapply the timestamp around the Throttle operator call. If we wouldn’t do so, Throttle would simply propagate the original timestamped value. This technique allows us to see the real delta between entering Throttle and leaving it, which should be about 1 second (give or take a few milliseconds).
Rx has an operator that can be used to “calm down” an observable sequence, called Throttle:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1)) // Exercise: what operator order is better?
.DistinctUntilChanged();
The way this works is a timer is used to let an incoming message age for the specified duration, after which it can be propagated further on. If during this timeframe another message comes in, the original message gets dropped on the floor and substituted for the new one that effectively also resets the timer. For our sample, if the user types “reactive” without hiccups (i.e. no two consecutive changes are further apart than 1 second), no intermediate substrings will be propagated. When the user stops typing (after hitting ‘e’, causing the last changed event higher up), it takes one second before the input “reactive” is propagated down. Later one we’ll feed this entire sequence to a web service which now cannot be called excessively due to a typist gone loose.
To illustrate the operator’s effect, let’s use the Do operator in conjunction with two specialized projection operators called Timestamp and Select. The former takes an IObservable
select ((TextBox)evt.Sender).Text)
.Timestamp()
.Do(inp => Console.WriteLine( "I: " + inp.Timestamp.Millisecond
+ " - " + inp.Value))
.Select(x => x.Value)
.Throttle(TimeSpan.FromSeconds(1))
.Timestamp()
.Do(inp => Console.WriteLine( "T: " + inp.Timestamp.Millisecond
+ " - " + inp.Value))
.Select(x => x.Value)
.DistinctUntilChanged();
Note: We need to remove and reapply the timestamp around the Throttle operator call. If we wouldn’t do so, Throttle would simply propagate the original timestamped value. This technique allows us to see the real delta between entering Throttle and leaving it, which should be about 1 second (give or take a few milliseconds).
5.
As we’ve used the same three operators twice, let’s extract
the pattern into a specialized operator. Creating your own operators shouldn’t
be hard as you can see:
public static IObservable LogTimestampedValues(this IObservable
source,
Action<Timestamped> onNext)
{
return source.Timestamp().Do(onNext).Select(x => x.Value);
}
Now we can rewrite the code to use our operator like this:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.LogTimestampedValues(x => Console.WriteLine("I: " + x.Timestamp.Millisecond
+ " - " + x.Value))
.Throttle(TimeSpan.FromSeconds(1))
.LogTimestampedValues(x => Console.WriteLine("T: " + x.Timestamp.Millisecond
+ " - " + x.Value))
.DistinctUntilChanged();
Below is the output for the sample of typing “reactive” with a mild hiccup after “re” and after “reac”, both of which were in the sub-second range, hence not causing propagation to beyond the Throttle operator. However, when the user stopped typing it took 1015ms before the “reactive” string was observed in the Do operator after the Throttle operator.
Note: It’s strongly encouraged to brainstorm for a moment how you’d write a Throttle operator on classic .NET events taking all complexities of timers, subscriptions, resource management, etc. into account. One of the core properties of Rx is that it allows reusable operators to be written, operating on a wide range of asynchronous data sources. This improves signal-to-noise ratio of user code significantly. In this particular sample, just two operators had to be added in order to tame the input sequence both for its data and for its timing behavior.
public static IObservable
Action<Timestamped
{
return source.Timestamp().Do(onNext).Select(x => x.Value);
}
Now we can rewrite the code to use our operator like this:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.LogTimestampedValues(x => Console.WriteLine("I: " + x.Timestamp.Millisecond
+ " - " + x.Value))
.Throttle(TimeSpan.FromSeconds(1))
.LogTimestampedValues(x => Console.WriteLine("T: " + x.Timestamp.Millisecond
+ " - " + x.Value))
.DistinctUntilChanged();
Below is the output for the sample of typing “reactive” with a mild hiccup after “re” and after “reac”, both of which were in the sub-second range, hence not causing propagation to beyond the Throttle operator. However, when the user stopped typing it took 1015ms before the “reactive” string was observed in the Do operator after the Throttle operator.
Note: It’s strongly encouraged to brainstorm for a moment how you’d write a Throttle operator on classic .NET events taking all complexities of timers, subscriptions, resource management, etc. into account. One of the core properties of Rx is that it allows reusable operators to be written, operating on a wide range of asynchronous data sources. This improves signal-to-noise ratio of user code significantly. In this particular sample, just two operators had to be added in order to tame the input sequence both for its data and for its timing behavior.
Exercise 6 – Rx’s concurrency model and synchronization
support
Objective: Rx employs a free world with
regards to concurrency of data sources and operators that act upon those. This
means that an observer’s On methods can be called from any “context”, which
typically boils down to threads. As an example, since an event in a UI
framework is raised on the UI thread, FromEventPattern’s call to OnNext with
the event’s data will also happen on that thread. In contrast, when a timer
from the System.Threading namespace is used (e.g. through the use of the
GenerateWithTime operator), its messages will come out on a different
(threadpool worker) thread. Rx has ways to introduce concurrency known as
schedulers and has operators that deal with synchronization onto a specific
scheduler. We’ll have a look at those now.
1.
In the previous example, we’ve
been calming down a TextBox’s input using the time-based Throttle method. In
order for this operator to send out messages, it needs to start a timer that’s
used to age the incoming messages in the way we described before. Under the
debugger it’s pretty straightforward to see those messages are indeed received
on a worker thread:
2.
As UI-savvy readers know, updating the UI from a
thread other than the UI thread is a big no-no. To illustrate this point in the
context of Rx, let’s introduce a Label control on the form and update the
handler to update the label with what the user just wrote (after being
throttled and filtered for adjacent duplicates):
var txt = new TextBox();
var lbl = new Label { Left = txt.Width + 20 };
var frm = new Form {
Controls = { txt, lbl }
};
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged();
using (input.Subscribe(inp => lbl.Text = inp))
Application.Run(frm);
Running this piece of code will cause it to fail upon trying to assign to the label’s Text property since that code is run from a thread other than the UI thread.
Note: In Windows Forms, behavior depends on whether or not a debugger is attached to the process. To reproduce this behavior, make sure to run under the debugger (i.e. start with F5, not CTRL-F5).
Start by enabling first-chance exceptions from the Debug, Exceptions… dialog (CTRL+D,E), as illustrated below. This will break in the debugger before the exception gets propagated and gets a chance to terminate the process:
Trying to run the executable now, entering a term in the TextBox control (and waiting for 1 second for it to be propagated beyond the Throttle operator) produces the following result:
Background: If you’d have a peek at the
call stack, you’d immediate see what we noticed in the previous step: the
OnNext action is getting invoked from a background thread, with
System.Concurrency near the bottom of the stack. This reveals Rx’s source of
concurrency in so-called IScheduler primitives. Whenever Rx needs to introduce
concurrency to make an operator do its job, it uses this namespace to call into
a scheduler. All of the operators that deal with concurrency have overloads
with a parameter that lets the user specify an IScheduler in case the default
is not what’s desired. This said, the defaults were carefully chosen, so
typically one doesn’t need to bother about those at all.
var txt = new TextBox();
var lbl = new Label { Left = txt.Width + 20 };
var frm = new Form {
Controls = { txt, lbl }
};
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged();
using (input.Subscribe(inp => lbl.Text = inp))
Application.Run(frm);
Running this piece of code will cause it to fail upon trying to assign to the label’s Text property since that code is run from a thread other than the UI thread.
Note: In Windows Forms, behavior depends on whether or not a debugger is attached to the process. To reproduce this behavior, make sure to run under the debugger (i.e. start with F5, not CTRL-F5).
Start by enabling first-chance exceptions from the Debug, Exceptions… dialog (CTRL+D,E), as illustrated below. This will break in the debugger before the exception gets propagated and gets a chance to terminate the process:
Trying to run the executable now, entering a term in the TextBox control (and waiting for 1 second for it to be propagated beyond the Throttle operator) produces the following result:
3.
So, how do we solve this issue? Obviously one could
click the link “How to make cross-thread calls to Windows Forms controls” as
highlighted above, to learn the Invoke method is all you need to solve the
issue. However, the deeper construct needed here is a way to jump between
schedulers, in this case from some background thread onto the UI thread. Thanks
to the powerful first-class object nature of observable sequences, we can
provide the user with an operator that does precisely this. Also, this solution
is more generic than a fix that just overcomes the single-threaded UI
limitation, as it allows one to jump between any two IScheduler “contexts”.
This operator is called ObserveOn and is shown below:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged();
using (input.ObserveOn(lbl).Subscribe(inp => lbl.Text = inp))
Application.Run(frm);
You will notice when you try to compile the new version of the code it will fail with the error message: “System.IObservable'
does not contain a definition for 'ObserveOn'… “.
The most generic overload of ObserveOn takes an IScheduler. For convenience, an overload is provided that takes in a Windows Forms Control. This extension method is tied to the System.Windows.Forms namespace and resides inside the System.Reactive.Windows.Forms assembly. Make the code resolve correctly we need to add an additional assembly reference to the System.Reactive.Windows.Forms assembly.
WPF users can use the ObserveOnDispatcher method to use the current dispatcher, which resides in the System.Reactive.Windows.Threading assembly.
Background: In past designs of Rx, the team experimented with other ways to express the thread affinity of messages generated by observable sequences. One was to have a global context property and a related one was to follow the F# model where a SynchronizationContext for the UI thread is used to synchronize every single message on. Those approaches severely hampered performance and scalability of Rx. The use of IScheduler led to much more flexibility on behalf of the user and operators like ObserveOn flow nicely like any other operator. In this world, the philosophy is to defer any synchronization tasks till the last point in time, i.e. right before some UI binding is made. This approach offers the best performance characteristics.
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged();
using (input.ObserveOn(lbl).Subscribe(inp => lbl.Text = inp))
Application.Run(frm);
You will notice when you try to compile the new version of the code it will fail with the error message: “System.IObservable
The most generic overload of ObserveOn takes an IScheduler. For convenience, an overload is provided that takes in a Windows Forms Control. This extension method is tied to the System.Windows.Forms namespace and resides inside the System.Reactive.Windows.Forms assembly. Make the code resolve correctly we need to add an additional assembly reference to the System.Reactive.Windows.Forms assembly.
WPF users can use the ObserveOnDispatcher method to use the current dispatcher, which resides in the System.Reactive.Windows.Threading assembly.
Background: In past designs of Rx, the team experimented with other ways to express the thread affinity of messages generated by observable sequences. One was to have a global context property and a related one was to follow the F# model where a SynchronizationContext for the UI thread is used to synchronize every single message on. Those approaches severely hampered performance and scalability of Rx. The use of IScheduler led to much more flexibility on behalf of the user and operators like ObserveOn flow nicely like any other operator. In this world, the philosophy is to defer any synchronization tasks till the last point in time, i.e. right before some UI binding is made. This approach offers the best performance characteristics.
Exercise 7 – Bridging the asynchronous method pattern with
Rx
Objective: In exercise 3, we learned
how to bring classic .NET events to Rx by means of the FromEventPattern
operator. Other asynchronous data sources exist in the .NET Framework, one of
the most notable being the plethora of asynchronous method patterns. In this
design pattern, two methods are provided. One method is used to start the
computation and returns an IAsyncResult “handle” that’s fed into the second
method to acquire the result of the computation. All of this is pretty
cumbersome to use in a manual fashion and often leads to spaghetti code with
the initial call in one place and the receive logic in another spot. We’ll now
explore how to expose such asynchronous data sources as observables.
1.
In our running sample, we’ve building up a simple
dictionary suggest application. Upon the user entering a search term, the
application will fire off a call to a web service to get word suggestions back.
Since we don’t want to block the UI, we’ll want to keep the communication with
the dictionary service asynchronous too.
The online dictionary we’ll be using can be found at http://www.dict.org. One of its disadvantages for direct consumption is the (seemingly arcane) Dictionary Server Protocol (RFC 2229) being used. To leverage our existing toolset, we’ll be much better off with a web service wrapper around it which can be found at http://services.aonaware.com/DictService.
Note: It’d be a good exercise to provide a native wrapper for the underlying Dictionary Server Protocol using Rx. Since TCP primitives in the System.Net namespace expose asynchronous operations as well, wrapping those should be not any harder than using a web service as we’re doing here. Obviously dealing with byte arrays (which the author really likes) received from a TCP socket obviously leads to more clutter.
As a little experiment with the service, choose the MatchInDict web method and enter wn for the dictionary identifier (for WordNet ® 2.0, which can be found through the DictionaryList method), reac for the word, and prefix for the strategy. Verify that hitting Invoke produces a bunch of words starting with “reac”:
The online dictionary we’ll be using can be found at http://www.dict.org. One of its disadvantages for direct consumption is the (seemingly arcane) Dictionary Server Protocol (RFC 2229) being used. To leverage our existing toolset, we’ll be much better off with a web service wrapper around it which can be found at http://services.aonaware.com/DictService.
Note: It’d be a good exercise to provide a native wrapper for the underlying Dictionary Server Protocol using Rx. Since TCP primitives in the System.Net namespace expose asynchronous operations as well, wrapping those should be not any harder than using a web service as we’re doing here. Obviously dealing with byte arrays (which the author really likes) received from a TCP socket obviously leads to more clutter.
As a little experiment with the service, choose the MatchInDict web method and enter wn for the dictionary identifier (for WordNet ® 2.0, which can be found through the DictionaryList method), reac for the word, and prefix for the strategy. Verify that hitting Invoke produces a bunch of words starting with “reac”:
2.
Keeping our existing code with the TextBox, we’ll
first focus on bridging with the web service. To perform those experiments,
comment out your current code to have a clean Main method playground. The first
thing to do is to add a reference to the web service using the Add Service
Reference… entry in Solution Explorer when right-clicking the project or
References node underneath it. This will create a new WCF service proxy.
Note: Classic web service client proxies could be used as well but do have a different asynchronous invocation mechanism based one classic .NET events. The new WCF approach – using the asynchronous method pattern – fits better in our sample to illustrate another Rx bridge operator.
In the dialog that appears, enter http://services.aonaware.com/DictService/DictService.asmx for the service address and click Go. Change the Namespace field to DictionarySuggestService. Don’t click OK yet.
Before clicking OK, go to the Advanced… dialog and tick the “Generate asynchronous operations” option:
Note: Classic web service client proxies could be used as well but do have a different asynchronous invocation mechanism based one classic .NET events. The new WCF approach – using the asynchronous method pattern – fits better in our sample to illustrate another Rx bridge operator.
In the dialog that appears, enter http://services.aonaware.com/DictService/DictService.asmx for the service address and click Go. Change the Namespace field to DictionarySuggestService. Don’t click OK yet.
Before clicking OK, go to the Advanced… dialog and tick the “Generate asynchronous operations” option:
3.
With our service client
generated, we’re ready to make web service calls. Ignore any Custom tool
warnings for the References.svcmap. Despite those warnings, the generated proxy
should work fine for our purposes. Let’s first have a look at the code involved
in manually invoking the service in an asynchronous manner:
var svc = new DictServiceSoapClient("DictServiceSoap");
svc.BeginMatchInDict("wn", "react", "prefix",
iar => {
var words = svc.EndMatchInDict(iar);
foreach (var word in words)
Console.WriteLine(word.Word);
},
null
);
Console.ReadLine();
The BeginMatchInDict method is the method that starts a web service call. Besides taking all of the parameters to be passed to the web method, it also takes an AsynCallback delegate. This delegate gets invokes when the service’s response has been received. This code is quite clumsy and the data aspect of the asynchronous call not being immediately apparent. Furthermore, composition with other asynchronous data sources (such as our TextBox) becomes quite hard. It’s also not clear how one can cancel outstanding requests, such that the callback procedure is guaranteed not to be called anymore. Dealing with error cases becomes hard too. Those complexities closely resemble the ones we called out for .NET events…
For completeness, here are the results you should expect to get back:
var svc = new DictServiceSoapClient("DictServiceSoap");
svc.BeginMatchInDict("wn", "react", "prefix",
iar => {
var words = svc.EndMatchInDict(iar);
foreach (var word in words)
Console.WriteLine(word.Word);
},
null
);
Console.ReadLine();
The BeginMatchInDict method is the method that starts a web service call. Besides taking all of the parameters to be passed to the web method, it also takes an AsynCallback delegate. This delegate gets invokes when the service’s response has been received. This code is quite clumsy and the data aspect of the asynchronous call not being immediately apparent. Furthermore, composition with other asynchronous data sources (such as our TextBox) becomes quite hard. It’s also not clear how one can cancel outstanding requests, such that the callback procedure is guaranteed not to be called anymore. Dealing with error cases becomes hard too. Those complexities closely resemble the ones we called out for .NET events…
For completeness, here are the results you should expect to get back:
4.
Converting the above fragment
to Rx isn’t very hard using the FromAsyncPattern method which takes a whole
bunch of generic overloads for various Begin* method parameter counts. The
generic parameters passed to this bridge method are the types of the Begin*
method parameters, as well as the return type of the corresponding End* method.
For the parameters to FromAsyncPattern, delegates to those Begin* and End*
methods are passed:
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
The result of this bridging is a Func delegate that takes the web service parameters and produces an observable sequence that will receive the results:
var res = matchInDict("wn", "react", "prefix");
var subscription = res.Subscribe(words => {
foreach (var word in words)
Console.WriteLine(word.Word);
});
Console.ReadLine();
To make this more clear, let’s get explicit about the types inferred for all of the calls shown above:
Contrast to .NET events (which are not parameterized), asynchronous method calls need input to operate on. To make a bridge to such a Begin-End method pair reusable, the return type is a function that accepts parameters to the underlying Begin* method.
Notice the bridge exhibits beneficial properties such as the explicit data-intensive nature reflected in the return type of the function (here an IObservable of a DictionaryWord-array). Other advantages include reusability and the option to unsubscribe from the asynchronous call. Since the result of calling the delegate returned from the FromAsyncPattern method returns an observable sequence, it can be used for further composition (as we shall see in a moment).
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
The result of this bridging is a Func delegate that takes the web service parameters and produces an observable sequence that will receive the results:
var res = matchInDict("wn", "react", "prefix");
var subscription = res.Subscribe(words => {
foreach (var word in words)
Console.WriteLine(word.Word);
});
Console.ReadLine();
To make this more clear, let’s get explicit about the types inferred for all of the calls shown above:
Contrast to .NET events (which are not parameterized), asynchronous method calls need input to operate on. To make a bridge to such a Begin-End method pair reusable, the return type is a function that accepts parameters to the underlying Begin* method.
Notice the bridge exhibits beneficial properties such as the explicit data-intensive nature reflected in the return type of the function (here an IObservable of a DictionaryWord-array). Other advantages include reusability and the option to unsubscribe from the asynchronous call. Since the result of calling the delegate returned from the FromAsyncPattern method returns an observable sequence, it can be used for further composition (as we shall see in a moment).
5.
Since we’ll always use our
service with the same dictionary and strategy, let’s simplify the function that
acts as our entry-point to the web service. This is a trivial bit of functional
programming where we simply wrap the function returned from FromAsyncPattern
(which takes 3 parameters) in a function (with only one parameter, i.e. the
term searched by the user) that supplies the two fixed parameter values:
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
term => matchInDict("wn", term, "prefix");
var res = matchInWordNetByPrefix("react");
var subscription = res.Subscribe(words => {
foreach (var word in words)
Console.WriteLine(word.Word);
});
Console.ReadLine();
Running this piece of code should produce the same results as shown in step 3.
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
term => matchInDict("wn", term, "prefix");
var res = matchInWordNetByPrefix("react");
var subscription = res.Subscribe(words => {
foreach (var word in words)
Console.WriteLine(word.Word);
});
Console.ReadLine();
Running this piece of code should produce the same results as shown in step 3.
6.
Since web services can easily fail, we should say a
word or two on error handling. Try running the service with a single letter as
its input, e.g. “r”. Due to the data volume returned by the server, the
System.ServiceModel layer triggers an error stating quota have been exceeded.
We don’t really care about the specifics of this error but it should be common wisdom that in the world of distributed and asynchronous programming errors are not that exceptional. Rx is particularly good at dealing with errors due to the separate observer’s OnError channel to signal those. If we were to change our sample as shown below, the error would be handled by the OnError function that’s part of the observer:
var res = matchInWordNetByPrefix("react");
var subscription = res.Subscribe(
words =>
{
foreach (var word in words)
Console.WriteLine(word.Word);
},
ex =>
{
Console.Error.WriteLine(ex.Message);
}
);
Note: Rx has exception handling operators such as Catch, Finally, OnErrorResumeNext and Retry which allow taking a compositional approach to error handling. Related operators include Materialize and Dematerialize which allow turning an IObservable into an
IObservable> and vice versa. Such a notification
represents an observer’s possible messages as data, i.e. an OnNext
object, an OnError object or an OnCompleted one. We won’t elaborate on the rich
exception handling operators present in Rx and will keep things simple by using
an OnError handler passed to Subscribe. If you want to make the application
more robust in the presence of short input strings, you could either pre-filter
the user input observable sequence (using a where
clause on the length of the string) or fix up the underlying WCF setting as
mentioned in the error text.
We don’t really care about the specifics of this error but it should be common wisdom that in the world of distributed and asynchronous programming errors are not that exceptional. Rx is particularly good at dealing with errors due to the separate observer’s OnError channel to signal those. If we were to change our sample as shown below, the error would be handled by the OnError function that’s part of the observer:
var res = matchInWordNetByPrefix("react");
var subscription = res.Subscribe(
words =>
{
foreach (var word in words)
Console.WriteLine(word.Word);
},
ex =>
{
Console.Error.WriteLine(ex.Message);
}
);
Note: Rx has exception handling operators such as Catch, Finally, OnErrorResumeNext and Retry which allow taking a compositional approach to error handling. Related operators include Materialize and Dematerialize which allow turning an IObservable
7.
One general issue with distributed programming we
should call out is the potential for out-of-order arrival of responses. In the
next exercise, we’ll compose the input sequence from the TextBox control with
web service calls, so multiple requests may be running at the same time. For
example, if the user types “reac”, waits one second (for Throttle to forward
the string to its observers), then proceeds with typing “reactive” and waits
another second, both web service calls will be in flight. The response to the
second call may arrive before the call to the first one does. This is not too
far-fetched even for this simple sample: as there’ll be more words starting
with “reac” than with “reactive”, the first request will be more
network-intensive than the second one.
We can mimic this situation quite easily by starting a couple of web service requests for “incremental strings” and observe the order answers come back in:
var input = "reactive";
for (int len = 3; len <= input.Length; len++)
{
var req = input.Substring(0, len);
matchInWordNetByPrefix(req).Subscribe(
words => Console.WriteLine(req + " --> " + words.Length)
);
}
If you run the fragment above a couple of times, you should be (un)lucky enough to hit an out-or-order arrival situation. While requests for “rea”, “reac”, “react”, “reacti”, “reactiv” and “reactive” are started in that order, results may come back in a different order as shown below:
We can mimic this situation quite easily by starting a couple of web service requests for “incremental strings” and observe the order answers come back in:
var input = "reactive";
for (int len = 3; len <= input.Length; len++)
{
var req = input.Substring(0, len);
matchInWordNetByPrefix(req).Subscribe(
words => Console.WriteLine(req + " --> " + words.Length)
);
}
If you run the fragment above a couple of times, you should be (un)lucky enough to hit an out-or-order arrival situation. While requests for “rea”, “reac”, “react”, “reacti”, “reactiv” and “reactive” are started in that order, results may come back in a different order as shown below:
Conclusion: Wrapping the omnipresent but hard to use asynchronous method pattern
in an observable sequence is easy with the FromAsyncPattern method. Specifying
the Begin and End method pair, one gets back a function that can be called to
obtain an observable sequence providing the results in an asynchronous manner.
Using this technique, we illustrated how to wrap a WCF proxy’s asynchronous
method to call a dictionary service. Finally, we pinpointed a couple of
asynchronous computing caveats such as errors and timing issues. Both of those
will be tacked in the next exercise.
Exercise 8 – SelectMany: the Zen of composition
Objective: With a tamed
input sequence and a bridge function for an asynchronous web service call, we’re
ready to glue together both pieces into a single application. For every term
entered by the user, we want to reach out to the service to obtain word
suggestions. In doing so, we’ll have to make sure to deal with potential
out-of-order arrival of results as exposed in the previous exercise.
1.
First, we’ll extend the application’s
UI to include a ListBox control that will be populated with the results that
come back from the web service. Depending on the reader’s preferences, a UI
designer can be used or the following code can be pasted in the Main method:
var txt = new TextBox();
var lst = new ListBox { Top = txt.Height + 10 };
var frm = new Form {
Controls = { txt, lst }
};
Assuming the Application.Run call is still in place, the following UI should be displayed:
var txt = new TextBox();
var lst = new ListBox { Top = txt.Height + 10 };
var frm = new Form {
Controls = { txt, lst }
};
Assuming the Application.Run call is still in place, the following UI should be displayed:
2.
Also restore the following piece of code that’s used
to obtain user input in a throttled and free-of-duplicates manner:
var input = (from evt in Observable.FromEvent(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged()
.Do(x => Console.WriteLine(x));
Different from previous exercises, we’re not going to subscribe to the input sequence directly. The Do operator added at the end will be used to visualize the requests that will be sent to the web service further on.
Note: This use of the Do method doesn’t need to take in a lambda expression but sometimes can take a method group instead. In fact, this form of assigning to a delegate has been available since C# 1.0. In the above we could simply .Do(Console.WriteLine). Now that we have lambda expressions, people tend to forget about this syntax and write .Do(x => Console.WriteLine(x)) all the time. Shortening the lambda form to the method group is in fact rooted in a well-known concept in functional programming theory, known as eta reduction.
var input = (from evt in Observable.FromEvent(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged()
.Do(x => Console.WriteLine(x));
Different from previous exercises, we’re not going to subscribe to the input sequence directly. The Do operator added at the end will be used to visualize the requests that will be sent to the web service further on.
Note: This use of the Do method doesn’t need to take in a lambda expression but sometimes can take a method group instead. In fact, this form of assigning to a delegate has been available since C# 1.0. In the above we could simply .Do(Console.WriteLine). Now that we have lambda expressions, people tend to forget about this syntax and write .Do(x => Console.WriteLine(x)) all the time. Shortening the lambda form to the method group is in fact rooted in a well-known concept in functional programming theory, known as eta reduction.
3.
Next, put the Rx-based web
service wrapper in place:
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
term => matchInDict("wn", term, "prefix");
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
term => matchInDict("wn", term, "prefix");
4.
At this point we got two
things: an observable sequence of input strings and a function that takes a
string and produces an observable sequence containing the corresponding words.
How do we glue those two together? The answer to this question lies in the
incredibly powerful SelectMany operator. One of its overloads is shown below:
It turns out we got all the ingredients SelectMany needs to do its magic: we got an IObservable which in our case contains strings. We also
got a function mapping those strings on an IObservable. What the SelectMany operator can do using
those inputs is bind them together.
Most readers will be familiar with this operator in a possibly unconscious manner. Every time you query some database traversing a relationship between tables, you’re really dealing with SelectMany. For example, assume you want to get all the suppliers across all the products in a store. Starting from a sequence of Product objects and a way to map a Product onto a Supplier (e.g. a function retrieving a Product’s SuppliedBy property), the operator can give us a flattened list of Supplier objects across all Product objects. The following figure illustrates this binding operation for our scenario of dictionary suggest:
Note: The cardinality of the web service output may be a bit confusing at first. Being an IObservable sequence containing an array of DictionaryWord objects, subscribing means you’ll get zero or more DictionaryWord arrays in an asynchronous manner. In this particular scenario, we’ll receive a single array which will contain matching words (possibly empty). If a service would exist that notifies the client about results matching the query as they are found during a dictionary scan, the resulting type could be an IObservable.
It turns out we got all the ingredients SelectMany needs to do its magic: we got an IObservable
Most readers will be familiar with this operator in a possibly unconscious manner. Every time you query some database traversing a relationship between tables, you’re really dealing with SelectMany. For example, assume you want to get all the suppliers across all the products in a store. Starting from a sequence of Product objects and a way to map a Product onto a Supplier (e.g. a function retrieving a Product’s SuppliedBy property), the operator can give us a flattened list of Supplier objects across all Product objects. The following figure illustrates this binding operation for our scenario of dictionary suggest:
Note: The cardinality of the web service output may be a bit confusing at first. Being an IObservable sequence containing an array of DictionaryWord objects, subscribing means you’ll get zero or more DictionaryWord arrays in an asynchronous manner. In this particular scenario, we’ll receive a single array which will contain matching words (possibly empty). If a service would exist that notifies the client about results matching the query as they are found during a dictionary scan, the resulting type could be an IObservable
5.
Let’s turn SelectMany into motion now. Instead of
using the operator as a method call, we can use C#’s query expression syntax
where the use of SelectMany is triggered by the presence of multiple from
clauses. This gets translated into one of the SelectMany overloads:
var res = from term in input
from words in matchInWordNetByPrefix(term)
select words;
This is all we need to do to compose the two asynchronous computations, the result still being asynchronous by itself as well. Herein lays the power of Rx: retaining the asynchronous nature of computations in the presence of rich composition operators (or “combinators”).
Background: SelectMany is one of the most powerful operators of Rx. Its power is rooted in the underlying theory of monads, leveraged by LINQ in general. While monads may sound like a scary disease, they really are a rather simple concept. In the world of monads, the SelectMany operation is called bind. Its purpose in life is to take an object “in the monad”, apply a function to it to end up with another object “in the monad”. It’s basically some kind of function application on steroids, threading a concern throughout the computation.
To put it more concrete, take the case of LINQ to Objects. Here the monad is IEnumerable. Given such a sequence and a function to map an element T
onto another IEnumerable sequence, we can get a flattened
IEnumerable sequence across all of the original sequence’s T elements.
Feel free to think products and suppliers if it helps. In Rx, the only
difference is that we’re now dealing with IObservable sequences. No matter
which monad we choose, the characteristic of SelectMany stays the same: start
in the monad, perform some function, and you’re still in the monad. An everyday
sample is clock arithmetic. Starting with the second hand in the range 0
through 59, applying any function (e.g. add 10 seconds) will continue to provide
a result that falls within the clock range. You can’t fall off the clock (read:
you can’t leave the monad)!
var res = from term in input
from words in matchInWordNetByPrefix(term)
select words;
This is all we need to do to compose the two asynchronous computations, the result still being asynchronous by itself as well. Herein lays the power of Rx: retaining the asynchronous nature of computations in the presence of rich composition operators (or “combinators”).
Background: SelectMany is one of the most powerful operators of Rx. Its power is rooted in the underlying theory of monads, leveraged by LINQ in general. While monads may sound like a scary disease, they really are a rather simple concept. In the world of monads, the SelectMany operation is called bind. Its purpose in life is to take an object “in the monad”, apply a function to it to end up with another object “in the monad”. It’s basically some kind of function application on steroids, threading a concern throughout the computation.
To put it more concrete, take the case of LINQ to Objects. Here the monad is IEnumerable
6.
The next step is to bind the
results to the UI. In order to receive the results, we got to subscribe the
resulting observable sequence. No matter how complex the composition is we’re
talking about, the result is still lazy. In this case, the SelectMany use has
returned an IObservable which won’t do anything till we
make a call to Subscribe. From that point on, throttled user input will trigger
web service calls whose results are sent to the observer passed to Subscribe:
using
(res.Subscribe(words =>
{
lst.Items.Clear();
lst.Items.AddRange((from word in words select word.Word).ToArray());
}))
Application.Run(frm);
In this piece of code, we receive the words in the OnNext handler. After clearing the elements in the ListBox control, we use regular LINQ to Objects over the array of DictionaryWord objects to get the Word property (which is of type string) and pass an array of words to AddRange. This additional step is needed since the dictionary service hands back DictionaryWord objects which do not only contain the word itself, but also provide the dictionary the word was found in (usable information when searching multiple dictionaries using other web methods).
{
lst.Items.Clear();
lst.Items.AddRange((from word in words select word.Word).ToArray());
}))
Application.Run(frm);
In this piece of code, we receive the words in the OnNext handler. After clearing the elements in the ListBox control, we use regular LINQ to Objects over the array of DictionaryWord objects to get the Word property (which is of type string) and pass an array of words to AddRange. This additional step is needed since the dictionary service hands back DictionaryWord objects which do not only contain the word itself, but also provide the dictionary the word was found in (usable information when searching multiple dictionaries using other web methods).
7.
One problem that lurks around
the corner is the thread the OnNext handler is called on. Since we’re receiving
data from the web service in an asynchronous manner, a threadpool thread will be
used by WCF to deliver the results to our code.
By now, the reader should know how to fix this issue: use ObserveOn.
using (res.ObserveOn(lst).Subscribe(words =>
{
…
With this fix in place, the result of running the code is shown below. Notice the output of the Do operator:
By now, the reader should know how to fix this issue: use ObserveOn.
using (res.ObserveOn(lst).Subscribe(words =>
{
…
With this fix in place, the result of running the code is shown below. Notice the output of the Do operator:
8.
However, there’s two more
problems waiting to get us. We pointed those out in the previous exercise.
Let’s start with the simpler of the two: what if the service returns an error?
In such a case, Rx’s SelectMany operator will propagate the exception down to
the observer’s OnError handler. Since we don’t have such a handler in our code
(yet), the exception will bring down the application. To solve this issue, we
add an OnError handler:
using (res.ObserveOn(lst).Subscribe(
words =>
{
lst.Items.Clear();
lst.Items.AddRange((from word in words select word.Word).ToArray());
},
ex =>
{
MessageBox.Show(
"An error occurred: " + ex.Message, frm.Text,
MessageBoxButtons.OK, MessageBoxIcon.Error
);
}))
{
Application.Run(frm);
}
Entering a single letter, resulting in an excessive return data volume, triggers an exception in the WCF stack, which now gets handled by the error handler code passed to Subscribe:
Note: After having received the exception, you are no longer receiving any other notifications.
using (res.ObserveOn(lst).Subscribe(
words =>
{
lst.Items.Clear();
lst.Items.AddRange((from word in words select word.Word).ToArray());
},
ex =>
{
MessageBox.Show(
"An error occurred: " + ex.Message, frm.Text,
MessageBoxButtons.OK, MessageBoxIcon.Error
);
}))
{
Application.Run(frm);
}
Entering a single letter, resulting in an excessive return data volume, triggers an exception in the WCF stack, which now gets handled by the error handler code passed to Subscribe:
Note: After having received the exception, you are no longer receiving any other notifications.
9.
A seemingly tougher problem is that of out-of-order
arrival as mentioned at the end of the previous exercise. To understand why
this can happen at all in the context of our composition using SelectMany, we
need to elaborate on the working of the operator. Whenever the SelectMany
operator receives an input on its source, it evaluates the selector function to
obtain an observable sequence that will provide data for the operator’s output.
Essentially it flattens all of the observable sequences obtained in this manner
into one flat resulting sequence.
For example, if the user types “react” and idles out for at least one second, SelectMany receives the string from the Throttle operator preceding it. Execution of the selector function - matchInWordNetByPrefix – causes a web service call to be started. While this call is in flight, the user may enter “reactive” which may end up triggering another web service call in a similar manner (one second throttle delay, application of the selector function). At that point, two parallel calls are happening, which could provide results out-of-order compared to the input.
The figure below illustrates the issue that can arise due to this behavior:
Since throttling adds a one second delay, you have to be a bit (un)lucky to hit this issue. However, if it remains unfixed, it’d likely come and get you the first time you demo the application to your boss. To show the issue’s presence, take out the Throttle operator and type the word “reactive” without hesitation. In order to avoid hitting the message size limit, filter out short terms as well:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Where(term => term.Length >= 3)
//.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged()
.Do(Console.WriteLine);
After a few attempts you should an out-of-order response issue, such as the one shown below:
In order to solve this issue, we need to cancel out existing web service requests as soon as the user enters a new term, indicating there’s no further interest in the previous search term. What we want to achieve is illustrated in the figure on the next page. The essence of the fix is “crossing out” or “muting” in-flight requests when a new one is received. This is illustrated as the red cross on the line for the first web service call. It turns out applying this fix is incredibly easy using Rx operators; moreover, there are a couple of ways to solve the issue. We’ll show two different approaches here.
Note: At some point in the design of Rx, there was a special SelectMany operator with cancellation behavior: whenever an object was received on the source, the previous (if any) inner observable was unsubscribed before calling the selector function to get a new inner observable. As you may expect, this led to numerous debates which of the two behaviors was desired. Having two different operator flavors for SelectMany was not a good thing for various reasons. For one thing, only one flavor could be tied to the C# and Visual Basic LINQ syntax. The ultimate solution was to decouple the notion of cancellation from the concept of “monadic bind”. As a result, new cancellation operators arose, which do have their use in a lot of other scenarios too. A win-win situation!
For example, if the user types “react” and idles out for at least one second, SelectMany receives the string from the Throttle operator preceding it. Execution of the selector function - matchInWordNetByPrefix – causes a web service call to be started. While this call is in flight, the user may enter “reactive” which may end up triggering another web service call in a similar manner (one second throttle delay, application of the selector function). At that point, two parallel calls are happening, which could provide results out-of-order compared to the input.
The figure below illustrates the issue that can arise due to this behavior:
Since throttling adds a one second delay, you have to be a bit (un)lucky to hit this issue. However, if it remains unfixed, it’d likely come and get you the first time you demo the application to your boss. To show the issue’s presence, take out the Throttle operator and type the word “reactive” without hesitation. In order to avoid hitting the message size limit, filter out short terms as well:
var input = (from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text)
.Where(term => term.Length >= 3)
//.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged()
.Do(Console.WriteLine);
After a few attempts you should an out-of-order response issue, such as the one shown below:
In order to solve this issue, we need to cancel out existing web service requests as soon as the user enters a new term, indicating there’s no further interest in the previous search term. What we want to achieve is illustrated in the figure on the next page. The essence of the fix is “crossing out” or “muting” in-flight requests when a new one is received. This is illustrated as the red cross on the line for the first web service call. It turns out applying this fix is incredibly easy using Rx operators; moreover, there are a couple of ways to solve the issue. We’ll show two different approaches here.
Note: At some point in the design of Rx, there was a special SelectMany operator with cancellation behavior: whenever an object was received on the source, the previous (if any) inner observable was unsubscribed before calling the selector function to get a new inner observable. As you may expect, this led to numerous debates which of the two behaviors was desired. Having two different operator flavors for SelectMany was not a good thing for various reasons. For one thing, only one flavor could be tied to the C# and Visual Basic LINQ syntax. The ultimate solution was to decouple the notion of cancellation from the concept of “monadic bind”. As a result, new cancellation operators arose, which do have their use in a lot of other scenarios too. A win-win situation!
10.
The realization of this cancellation behavior can be
achieved using a single operator called TakeUntil whose behavior is to take
elements from an observable sequence until another “signal” observable tells it
to stop. One can compare it to a blowout preventer on an oil pipeline. While
distasteful jokes about oil could be made, we’ll retain ourselves from doing
so. Suffice to say our operator does
work and has been well-tested.
What should be the valve to shut down receiving results from an ongoing web service request in our case? The user entering a new input string is what we need. So, all we have to do is stick a TakeUntil call on the result of calling the web service method. Let’s put it to the test and tweak our code as shown below:
var res = from term in input
from word in matchInWordNetByPrefix(term).TakeUntil(input)
select word;
Recapping what this code does, the input sequence produces terms based on (throttled) user input. When a term is received from the input, the SelectMany operator hidden behind the second from keyword kicks in and executes the selector function passing it the received term:
term => matchInWordNetByPrefix(term).TakeUntil(input)
In here, the resulting observable sequence is provided by the TakeUntil operator which on its turn consumes input from a web service call. It’s important to notice the result of calling the selector function is an observable sequence which hasn’t been subscribed to yet, so the web service call is not taking place yet. However, the very next thing the SelectMany operator does is subscribing to the observable obtained by the selector function. At that point, TakeUntil does its job. It subscribes to both its source (which is the web service call getting made, ticking away to produce its array of DictionaryWord objects) and the “valve” observable. From this point on, the TakeUntil operator will pass on any of the data it receives until the valve is signaled. In this case, the valve simply corresponds to user input. As a result, the next time the user types something, the valve is closed for the web service request that’s in flight. When SelectMany also sees the user’s new input, it repeats the steps we mentioned here. And so on…
Visualizing the cancellation behavior can be achieved using another operator. Where Do monitors data flowing through an observable “pipeline”, the Finally operator can be used to perform an action when a disposal of the subscription happens. While it’s normally used for clean-up operations, it comes in handy for the kind of logging we need here:
var res = from term in input
from word in matchInWordNetByPrefix(term)
.Finally(() => Console.WriteLine("Disposed request for " + term))
.TakeUntil(input)
select word;
The result of the above is shown below. When TakeUntil shuts down the valve it gets rid of the source by disposing the subscription it holds to it. This causes Finally to log the message:
Note: Why are we seeing the Do-based logging for input twice now? The answer is pretty straightforward: both the SelectMany operator (in the first from clause) and the TakeUntil operator are subscribed to the input sequence. When side-effects are involved with subscriptions (e.g. paying a fee), you may want to share a single underlying subscription. Operators such as Publish exist for this purpose but fall outside the scope of this HOL.
The resulting code for the entire application fits on less than a page. Imagine doing the same composition of events with asynchronous web services in a concise manner, taking care of all the edge cases we discussed:
var txt = new TextBox();
var lst = new ListBox { Top = txt.Height + 10 };
var frm = new Form {
Controls = { txt, lst }
};
// Turn the user input into a tamed sequence of strings.
var textChanged = from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text;
var input = textChanged
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged();
// Bridge with the web service's MatchInDict method.
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
term => matchInDict("wn", term, "prefix");
// The grand composition connecting the user input with the web service.
var res = from term in input
from word in matchInWordNetByPrefix(term).TakeUntil(input)
select word;
// Synchronize with the UI thread and populate the ListBox or signal an error.
using (res.ObserveOn(lst).Subscribe(
words => {
lst.Items.Clear();
lst.Items.AddRange((from word in words select word.Word).ToArray());
},
ex => {
MessageBox.Show("An error occurred: " + ex.Message, frm.Text,
MessageBoxButtons.OK, MessageBoxIcon.Error);
}))
{
Application.Run(frm);
} // Proper disposal happens upon exiting the application.
What should be the valve to shut down receiving results from an ongoing web service request in our case? The user entering a new input string is what we need. So, all we have to do is stick a TakeUntil call on the result of calling the web service method. Let’s put it to the test and tweak our code as shown below:
var res = from term in input
from word in matchInWordNetByPrefix(term).TakeUntil(input)
select word;
Recapping what this code does, the input sequence produces terms based on (throttled) user input. When a term is received from the input, the SelectMany operator hidden behind the second from keyword kicks in and executes the selector function passing it the received term:
term => matchInWordNetByPrefix(term).TakeUntil(input)
In here, the resulting observable sequence is provided by the TakeUntil operator which on its turn consumes input from a web service call. It’s important to notice the result of calling the selector function is an observable sequence which hasn’t been subscribed to yet, so the web service call is not taking place yet. However, the very next thing the SelectMany operator does is subscribing to the observable obtained by the selector function. At that point, TakeUntil does its job. It subscribes to both its source (which is the web service call getting made, ticking away to produce its array of DictionaryWord objects) and the “valve” observable. From this point on, the TakeUntil operator will pass on any of the data it receives until the valve is signaled. In this case, the valve simply corresponds to user input. As a result, the next time the user types something, the valve is closed for the web service request that’s in flight. When SelectMany also sees the user’s new input, it repeats the steps we mentioned here. And so on…
Visualizing the cancellation behavior can be achieved using another operator. Where Do monitors data flowing through an observable “pipeline”, the Finally operator can be used to perform an action when a disposal of the subscription happens. While it’s normally used for clean-up operations, it comes in handy for the kind of logging we need here:
var res = from term in input
from word in matchInWordNetByPrefix(term)
.Finally(() => Console.WriteLine("Disposed request for " + term))
.TakeUntil(input)
select word;
The result of the above is shown below. When TakeUntil shuts down the valve it gets rid of the source by disposing the subscription it holds to it. This causes Finally to log the message:
Note: Why are we seeing the Do-based logging for input twice now? The answer is pretty straightforward: both the SelectMany operator (in the first from clause) and the TakeUntil operator are subscribed to the input sequence. When side-effects are involved with subscriptions (e.g. paying a fee), you may want to share a single underlying subscription. Operators such as Publish exist for this purpose but fall outside the scope of this HOL.
The resulting code for the entire application fits on less than a page. Imagine doing the same composition of events with asynchronous web services in a concise manner, taking care of all the edge cases we discussed:
var txt = new TextBox();
var lst = new ListBox { Top = txt.Height + 10 };
var frm = new Form {
Controls = { txt, lst }
};
// Turn the user input into a tamed sequence of strings.
var textChanged = from evt in Observable.FromEventPattern(txt, "TextChanged")
select ((TextBox)evt.Sender).Text;
var input = textChanged
.Throttle(TimeSpan.FromSeconds(1))
.DistinctUntilChanged();
// Bridge with the web service's MatchInDict method.
var svc = new DictServiceSoapClient("DictServiceSoap");
var matchInDict = Observable.FromAsyncPattern<string, string, string, DictionaryWord[]>
(svc.BeginMatchInDict, svc.EndMatchInDict);
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
term => matchInDict("wn", term, "prefix");
// The grand composition connecting the user input with the web service.
var res = from term in input
from word in matchInWordNetByPrefix(term).TakeUntil(input)
select word;
// Synchronize with the UI thread and populate the ListBox or signal an error.
using (res.ObserveOn(lst).Subscribe(
words => {
lst.Items.Clear();
lst.Items.AddRange((from word in words select word.Word).ToArray());
},
ex => {
MessageBox.Show("An error occurred: " + ex.Message, frm.Text,
MessageBoxButtons.OK, MessageBoxIcon.Error);
}))
{
Application.Run(frm);
} // Proper disposal happens upon exiting the application.
11.
Since this issue is quite
common, a specialized operator called Switch was introduced. Given a sequence
of sequences (yes, that’s not a typo) it hops from one sequence to
another as they come in. Using a line diagram this can be made clear in an easy
manner:
When the topmost observable “outer” sequence produces a new observable “inner” sequence, an existing inner sequence subscription is disposed and the newly received sequence is subscribed to. Results produced by the current inner sequence are propagated to the Switch operator’s output.
Use of this operator in our scenario proceeds as follows. First we map user input on web service requests using a simple Select operator use. Since every web service request returns an IObservable, the result
of this projection is an
IObservable>. Applying Switch over
this nested sequence causes the behavior described above. For every request
submitted by the user, the web service is contacted. If a new request is made,
Switch cancels out the existing one’s subscription and hops to the new one:
var
res = (from term in
input
select matchInWordNetByPrefix(term))
.Switch();
Which approach one choses is solely dependent on personal taste. Both are equally good at solving the issue.
When the topmost observable “outer” sequence produces a new observable “inner” sequence, an existing inner sequence subscription is disposed and the newly received sequence is subscribed to. Results produced by the current inner sequence are propagated to the Switch operator’s output.
Use of this operator in our scenario proceeds as follows. First we map user input on web service requests using a simple Select operator use. Since every web service request returns an IObservable
select matchInWordNetByPrefix(term))
.Switch();
Which approach one choses is solely dependent on personal taste. Both are equally good at solving the issue.
Conclusion: Composition of multiple asynchronous data sources is one of the main
strengths of Rx. In this exercise we looked at the SelectMany operator that
allows “binding” one source to another. More specially, we turned user entries
of terms (originating from .NET events) – into asynchronous web service calls
(brought to Rx using FromAsyncPattern). To deal with errors and out-of-order
arrival only a minimal portion of the code had to be tweaked. While we didn’t
mention operators other than SelectMany, TakeUntil and Switch that deal with
multiple sources, suffice it to say that a whole bunch of those exist awaiting
your further exploration.
Exercise 9 – Testability and mocking made easy
Objective: Testing asynchronous code is
a hard problem. Representing asynchronous data sources as first-class objects
implementing a common generic interface, Rx is in a unique position to help to
simplify this task as well. We’ll learn how easy it is to mock observable
sequences to create solid tests.
1.
Each observable sequence can be regarded as a
testable unit. In our dictionary suggest sample, two essential sequences are
being used. One is the user input sequence; the other is its composition with
the web service. Since all of those are first-class objects, we can simply swap
them out for a sequence “mock”.
One particularly useful operator in Rx is ToObservable defined as an extension method on IEnumerable. It’s a pull-to-push adapter that enumerates
(pulls) the given sequence and exposes it as an observable sequence that
notifies (pushes) its observers of the sequence’s elements. As an example,
replace the input sequence for an array-based mock observable as shown below:
//var input =
(from evt in Observable.FromEventPattern(txt, "TextChanged")
// select ((TextBox)evt.Sender).Text)
// .DistinctUntilChanged()
// .Throttle(TimeSpan.FromSeconds(1));
var input = new[] { "reac", "reactive", "bing" }.ToObservable();
Background: The deep duality between IEnumerable
and IObservable has yielded a lot of great results in the development
of Rx. The ability to go back and forth between both models using the
ToObservable and the ToEnumerable operators is just one sample of this. Because
of the intrinsically concurrent nature of observable sequences, those operators
exhibit some interesting properties. ToObservable –as shown here – introduces
more concurrency since it needs to pull the enumerable sequence without
blocking the caller, in order to feed the elements to the resulting observable.
ToEnumerable on the other hand reduces concurrency as all the observable
sequence’s elements are tunneled onto the enumeration thread. An implication of
the push-to-pull conversion realized by ToEnumerable is that it may have to
buffer (enumeration may proceed at a different pace from the observable
producer), while ToObservable never has to buffer. True mirror images: duality at its best!
One particularly useful operator in Rx is ToObservable defined as an extension method on IEnumerable
// select ((TextBox)evt.Sender).Text)
// .DistinctUntilChanged()
// .Throttle(TimeSpan.FromSeconds(1));
var input = new[] { "reac", "reactive", "bing" }.ToObservable();
Background: The deep duality between IEnumerable
2.
Now when we try to run the
application, our web service will be fed “reac”, “reactive” and “bing”
virtually at the same time since there’s no delay between the elements in the
input. If the TakeUntil or Switch operator does its job correctly, only the
results for the “bing” request should appear on the screen.
It’s a worthy experiment to take out the out-of-order arrival prevention mechanism from the previous exercise (i.e. TakeUntil or Switch) and observe responses coming back. With a bit of luck, you’ll see the issue cropping up again. A sequence with a higher likelihood of reproducing the issue is the following, using LINQ to Objects:
var input = (from len in Enumerable.Range(3, 8)
select "reactive".Substring(0, len)) // rea, reac, react, reacti, reactiv, reactive
.ToObservable();
It’s a worthy experiment to take out the out-of-order arrival prevention mechanism from the previous exercise (i.e. TakeUntil or Switch) and observe responses coming back. With a bit of luck, you’ll see the issue cropping up again. A sequence with a higher likelihood of reproducing the issue is the following, using LINQ to Objects:
var input = (from len in Enumerable.Range(3, 8)
select "reactive".Substring(0, len)) // rea, reac, react, reacti, reactiv, reactive
.ToObservable();
3.
Thanks to the various time-based operators, we can provide
more realistic input. One thing we could do to mock user input in a more
faithful way is to use time-based operators to mimic typing. Generate is a
suitable candidate for our next experiment. Let’s generate a sequence of
incremental substrings for a given term, with random delays between them.
const string INPUT = "reactive";
var rand = new Random();
var input = Observable.Generate(
3,
len => len <= INPUT.Length,
len => len + 1,
len => INPUT.Substring(0, len),
_ => TimeSpan.FromMilliseconds(rand.Next(200, 1200))
)
.ObserveOn(txt)
.Do(term => txt.Text = term)
.Throttle(TimeSpan.FromSeconds(1));
GenerateWithTime uses a random number generator to simulate typing speed variations between 200 and 1200 milliseconds (such that Throttle will sometimes let a substring through). Before we throttle the sequence, we use the side-effecting Do operator to put the substring in the TextBox control to really simulate the user typing in a visual manner. To be able to do this, we have to use ObserveOn to be on the right UI context.
Note: While typing the code above you may notice a Subscribe method in the IntelliSense list for the input string. This extension method on IEnumerable is yet another sample of the dual treatment of
enumerable and observable sequences and is closely related to ToEnumerable and
ToObservable. Similarly, a GetEnumerator method is defined on
IObservable.
Adding another Do logger to see Throttle passing on a search term to the web service is left as an exercise for the reader.
const string INPUT = "reactive";
var rand = new Random();
var input = Observable.Generate(
3,
len => len <= INPUT.Length,
len => len + 1,
len => INPUT.Substring(0, len),
_ => TimeSpan.FromMilliseconds(rand.Next(200, 1200))
)
.ObserveOn(txt)
.Do(term => txt.Text = term)
.Throttle(TimeSpan.FromSeconds(1));
GenerateWithTime uses a random number generator to simulate typing speed variations between 200 and 1200 milliseconds (such that Throttle will sometimes let a substring through). Before we throttle the sequence, we use the side-effecting Do operator to put the substring in the TextBox control to really simulate the user typing in a visual manner. To be able to do this, we have to use ObserveOn to be on the right UI context.
Note: While typing the code above you may notice a Subscribe method in the IntelliSense list for the input string. This extension method on IEnumerable
Adding another Do logger to see Throttle passing on a search term to the web service is left as an exercise for the reader.
4.
Similarly, we could mock the
web service by replacing the matchInWordNetByPrefix function. Obviously one
will have to come up with some output to go with the input term, maybe from a
local dictionary or based on some dummy output generation. Below is a sample
web service mock:
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
//term => matchInDict("wn", term, "prefix");
term =>
Observable.Return(
(from i in Enumerable.Range(0, rand.Next(0, 50))
select new DictionaryWord { Word = term + i })
.ToArray()
).Delay(TimeSpan.FromSeconds(rand.Next(1, 10)));
Reading the code inside-out, we first generate a range of 0 to 50 numbers which we append to the service’s input using a projection. At this point, we’re using the established LINQ to Objects functionality. As a result, we end up with an array of some number of outputs. For example, given “rea”, we may get { “rea0”, “rea1”, “rea2” } back. Since the web service contract is to return an observable of such an array, we use Observable.Return to create a single-element observable sequence with the generated array. Finally, we use the Delay operator that’s defined for observable sequences to add a random 1-to-10 second delay in sending back the response.
Running the sample again without the out-of-order prevention, it should be plain easy to hit the issue we have described in much detail before. Assume such an issue has been uncovered in your code; it’s incredibly simple to create a test case for it using mocks like those.
Func<string, IObservable<DictionaryWord[]>> matchInWordNetByPrefix =
//term => matchInDict("wn", term, "prefix");
term =>
Observable.Return(
(from i in Enumerable.Range(0, rand.Next(0, 50))
select new DictionaryWord { Word = term + i })
.ToArray()
).Delay(TimeSpan.FromSeconds(rand.Next(1, 10)));
Reading the code inside-out, we first generate a range of 0 to 50 numbers which we append to the service’s input using a projection. At this point, we’re using the established LINQ to Objects functionality. As a result, we end up with an array of some number of outputs. For example, given “rea”, we may get { “rea0”, “rea1”, “rea2” } back. Since the web service contract is to return an observable of such an array, we use Observable.Return to create a single-element observable sequence with the generated array. Finally, we use the Delay operator that’s defined for observable sequences to add a random 1-to-10 second delay in sending back the response.
Running the sample again without the out-of-order prevention, it should be plain easy to hit the issue we have described in much detail before. Assume such an issue has been uncovered in your code; it’s incredibly simple to create a test case for it using mocks like those.
Conclusion: The first class object nature of observable
sequences makes it easy to replace them, contrast to various asynchronous
technologies like .NET events. This allows for smooth testing of asynchronous
programs using mock input sequences, e.g. based on enumerable sequences turned
into observable ones using ToObservable.