Thursday, August 19, 2010

Simplifying Silverlight Web Service Calls with Reactive Extensions (Rx)

I've been working with the Reactive Extensions (Rx) library quite a bit lately and am very impressed. While it is a new way of thinking about services, it certainly makes life much easier. In this example, I'll show you a way to simplify your web service calls using Rx. In fact, even if you don't use Reactive Extensions, you may benefit from the proxy wrappers that I'll describe.

I'm assuming you are familiar with Silverlight, web services, and have some exposure to the Managed Extensibility Framework. You'll also want to make sure you've got the latest version of Rx for Silverlight 4.

Let's get started! First, create a new Silverlight 4 Application. Keep all of the defaults: we do want a web project, but we aren't using RIA.

The Service: Server Side

Let's create a simple calculator service. Sure, it is a simple example, but it will make it easier to focus on the details of Rx rather than puzzling over a more complex web service example.

Create a new service and call it "Calculator." Just place it in the root of the web application. Create a contract and implement it, so that your service ends up looking like this:

namespace RxWebServices.Web
{
    [ServiceContract(Namespace = "http://csharperimage/")]
    public interface ICalculator
    {
        [OperationContract]
        long Add(int operand1, int operand2);       
    }

    [AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
    public class Calculator : ICalculator
    {
        readonly Random _random = new Random();

        public long Add(int operand1, int operand2)
        {
            Thread.Sleep(TimeSpan.FromMilliseconds(_random.Next(1000) + 50));
            return operand1 + operand2;
        }        
    }
}

Notice I built in a delay. This is important to see how Rx helps us handle the asynchronous nature of web service calls.

Go ahead and hit CTRL+F5 to build and run without debugging. This will set up the service end point for us to grab in Silverlight. Now, in your Silverlight project, let's set some things up.

The Service: Client Side

First, we want to add some references to both Reactive Extensions (Rx) and the Managed Extensibility Framework. Below, I've highlighted the references to add:


Now we can add our service reference. Right-click references, choose "Add Service" and select "Discover Services in Solution". You should be able to select the calculator service. Put it in the namespace "Calculator Service" as depicted below.


Making Things Easy: ServiceProxy

Services can seem complex, but with the factory patterns provided by the framework and the support of relative paths, abstracting the creation of an end point is easy. I like to create a proxy class thas manages the end points for me. In this example, I store the end point as a constant. However, you can easily make it a parameter for your Silverlight application and construct it on the fly. All my "consumer" really cares about is the service contract, not the details of how to wire in the service endpoint. So, let's make it easy. Take a look at the following class. The class itself is never instanced directly, but it will export the service contract so that wherever I import it, I'll have a fully wired version of the proxy ready to use.

Create a folder called "Implementation" and add "ServiceProxy.cs". Your class will look like this:

namespace RxWebServices.Implementation
{
    public  class ServiceProxy
    {
        private const string CALCULATOR_SERVICE = "../Calculator.svc";
        private const string NOT_SUPPORTED = "Type {0} not supported";                

        private static readonly Dictionary<Type, Uri> _serviceMap
            = new Dictionary<Type, Uri> {{typeof (ICalculator), new Uri(CALCULATOR_SERVICE,UriKind.Relative)}};

        public static T GetProxyFor<T>()
        {
            if (!_serviceMap.ContainsKey(typeof(T)))
            {
                throw new TypeLoadException(string.Format(NOT_SUPPORTED, typeof (T).FullName));
            }

            return
                new ChannelFactory<T>(new BasicHttpBinding(), new EndpointAddress(_serviceMap[typeof (T)])).
                    CreateChannel();
        }

        [Export]
        public ICalculator CalculatorService
        {
            get { return GetProxyFor<ICalculator>(); }
        }
    }
}

Take a look. We are mapping the service contract to the end points. In our case, it is relative to the site serving the Silverlight application. Because the application is in ClientBin, we back up one level to access the service. Note this will work just as easily for a service hosted somewhere else: I would simply specify a relative or absolute uri. We only have one service, but the dictionary makes it easy to map multiple ones. The export uses the channel factory to generate an instance and return the client.

Our Internal Contract

I rarely let the rest of my application concern itself with the details of the service. Any other area of my application is simply asking for results based on input, regardless of how it is obtained. Therefore, I'll create a very light contract for the calculator service internally - one that is easy to mock and test.

Create a folder called "Contract" and add one interface, ICalculatorService. The interface looks like this:

namespace RxWebServices.Contract
{
    public interface ICalculatorService
    {
        IObservable<long> Add(int operand1, int operand2);
    }
}

Here is where things get interesting. You should be familiar with IEnumerable which we'll call a "pull" sequence of elements: you pull the values from the iterator. With Reactive Extensions, we invert this using IObservable to create a "push" sequence. With the push sequence, you subscribe and receive an event (pushed to you) when an element is available. In this case, we'll subscribe by sending in two operands, and wait to be pushed the result when it comes back.

Wrapping the Service

Now we've got a service proxy and an interface. Let's satisfy the contract. I'll show you the code, then explain it. Under the implementation folder, create a Calculator.cs class and wire it up like this:

namespace RxWebServices.Implementation
{
    [Export(typeof(ICalculatorService))]
    public class Calculator : ICalculatorService, IPartImportsSatisfiedNotification
    {
        [Import]
        public ICalculator CalculatorProxy { get; set; }

        private Func<int,int,IObservable<long>> _calculatorService;            

        public IObservable<long> Add(int operand1, int operand2)
        {
            return _calculatorService(operand1, operand2);
        }

        public void OnImportsSatisfied()
        {
            _calculatorService = Observable.FromAsyncPattern<int, int, long>
                (CalculatorProxy.BeginAdd, CalculatorProxy.EndAdd);
        }
    }
}

Let's break it down. First, you'll notice we import the calculator service. This is the actual proxy we set up in the previous class. When the import is satisfied, we use a helper method provided by Rx to convert the asynchronous call into an observable list. The FromAsyncPattern takes in the types of the inputs, followed by the type of the output. It creates a function that, when called, returns an observable list of the results. In this case, we cast it from the beginning call to our calculator service to the return call. This is the way we take the asynchronous call and turn it into an observable list.

When we actually want to use the method, we call the function with the inputs, and receive the output as the observable. Thus, we do all of the conversion internally, hide the implementation details, and just return a stream that can be subscribed to in order to fetch the results.

Take a look at the signature for the actual service:

private interface ICalculator
{
    IAsyncResult BeginAdd(int operand1, int operand2, AsyncCallback callback, object asyncState);
    
    long EndAdd(IAsyncResult result);
}

To use Rx, we want a function that takes all of the inputs up until the AsyncCallback parameter, and returns an observable list of the return value. In this case, our two inputs are int, and it returns a long, so our function signature is Func<int,int,IObservable<long>>. By using these same types on the FromAsyncPattern extension method, Rx will return us the appropriate function and expect a pointer to the methods to start and the end the call.

Fibonnacci Sequence

Now we can get to the fun part: using the service. We'll use the service two different ways to illustrate how the observable lists work. In the MainPage.xaml, add some rows, a set of buttons, and a stackpanel. Generate code behind for the buttons. It will look something like this:

<Grid x:Name="LayoutRoot" Background="White">
    <Grid.RowDefinitions>
        <RowDefinition Height="Auto"/>
        <RowDefinition Height="*"/>
    </Grid.RowDefinitions>
    <StackPanel Orientation="Horizontal" HorizontalAlignment="Center">
        <Button Content=" GO " Click="Button_Click" Margin="5"/>
        <Button Content=" GO " Click="Button_Click_1" Margin="5"/>
    </StackPanel>
    <StackPanel Orientation="Horizontal" Grid.Row="1" HorizontalAlignment="Stretch" VerticalAlignment="Stretch" x:Name="MainSurface"/>
</Grid>

Next, let's go to the code behind and wire in the first example. First, we'll add some properties we're going to be using:

[Import]
public ICalculatorService Calculator { get; set; }

private IDisposable _sequence;

private readonly Subject<long> _watcher = new Subject<long>();

private int _x, _y, _iterations;

The first piece is the service, which we import using MEF. When we subscribe to services, we receive a disposable observer. In order to cancel observations in progress and start new ones, we'll keep a reference to this using the _sequence field.

What's Your Favorite Subject?

The subject is interesting. Subjects are used to set up a publisher/subscriber model. The subject here is a long. Anyone with access to the subject can publish (send it a long value) and/or subscribe (receive notifications when values are published). We'll use this to bridge between our UI and the service.

Finally, we've got some local variables to use to keep track of state.

Next, we'll set everything up in the constructor:

public MainPage()
{
    InitializeComponent();

    if (DesignerProperties.IsInDesignTool) return;

    CompositionInitializer.SatisfyImports(this);

    _watcher.ObserveOnDispatcher().Subscribe(
        answer =>
            {
                var grid = new Grid
                                {
                                    Width = answer,
                                    Height = answer,
                                    Background = new SolidColorBrush(Colors.Red),
                                    Margin = new Thickness(5, 5, 5, 5)
                                };
                var tb = new TextBlock {Margin = new Thickness(2, 2, 2, 2), Text = answer.ToString()};
                grid.Children.Add(tb);
                MainSurface.Children.Add(grid);
                _Add();
            });
}

The first thing you'll notice is that if we're in the designer, all bets are off and we drop out. Otherwise, we compose the parts, which gives us our service import. Next, we'll subscribe to our subject. Notice that we don't have any service interaction yet. The subscription basically breaks down like this:

  • I'm interested in the subject with long values
  • When something happens, let me know on the dispatcher thread (as I'm going to do something with the UI)
  • When a long value is observed, give it to me: I'll make a grid as big as the value I received, put some text inside it, add it to the stack panel and then call the _Add method

That's very simple and straightforward. No we can explore the missing method. First, let's kick things off when the user clicks the first button. I want to use the add service to compute a fibonnacci ratio (each number is the sum of the previous two, started with 1 and 1). I'll implement the button click code-behind and add the missing method here:

private void Button_Click(object sender, RoutedEventArgs e)
{
    if (_sequence != null)
    {
        _sequence.Dispose();
    }

    MainSurface.Children.Clear();

    _x = 1;
    _y = 1;
    _iterations = 0;

    _watcher.OnNext(_x);            
}      
  
private void _Add()
{         
    _sequence.Dispose();

    if (++_iterations == 20)
    {
        return;
    }

    _sequence = Calculator.Add(_x, _y).Subscribe(answer =>
                                            {
                                                _x = _y;
                                                _y = (int)answer;
                                                _watcher.OnNext(answer);                                                     
                                            });
}

So the first part should be straight forward. If we had another sequence, dispose it. This will cancel any observations in progress. Clear the surface, initialize our variables, and then call the OnNext function on our subject. What's that? Simple: we just published a number. The subject will receive the number (1) and then push it to any subscriptions. We subscribed earlier, so we'll create a 1x1 grid and call the _Add method.

This method is even more interesting. First, we stop after 20 iterations. No sense in going to infinity. Next, we subscribe to the calculator service. Subscriptions to observable lists are the same as subscriptions to subjects. We're asking to watch for a value, and initiating the "watch" by sending in our first values (1 and 1). When we receive the answer, we shift the numbers to continue the sequence, and then publish the number to the subject.

This allows us to "daisy chain" service calls. We wait until we receive the first answer before we ask the next question. At this point, if you hit F5 (or CTRL-F5) to run it, and click the first button, you should see this:

Note if you keep clicking while it is rendering, it will start over. There will be no "hanging" calls because the calls are daisy chained. We are also not blocking the UI while waiting, or you wouldn't be able to click the button again. You can clearly see the delays on the server as the results are returned.

Here is a simplified overview of what is happening:

Random Addition

Now we'll throw another function into the mix. It's time to set up the second button. For this button, we're going to add two methods. The first is an enumerable that returns nothing but random numbers. It loops infinitely so we obtain as many numbers as we like, and we'll receive them in tuples:

private static IEnumerable<Tuple<int,int>> _RandomNumbers()
{
    var random = new Random();

    while (true)
    {
        yield return Tuple.Create(random.Next(100), random.Next(100));                
    }
}

In the event handler for the second button, add this bit of code:

private void Button_Click_1(object sender, RoutedEventArgs e)
{
    if (_sequence != null)
    {
        _sequence.Dispose();
    }

    MainSurface.Children.Clear();

    _sequence = _RandomNumbers()
        .ToObservable()
        .Take(20)                
        .Subscribe(numbers => Calculator.Add(numbers.Item1, numbers.Item2)
                                    .ObserveOnDispatcher()
                                    .Subscribe(result =>
                                                    {
                                                        var text = string.Format("{0}+{1}={2}", numbers.Item1,
                                                                                numbers.Item2, result);
                                                        var tb = new TextBlock
                                                                    {Margin = new Thickness(5, 5, 5, 5), Text = text};
                                                        MainSurface.Children.Add(tb);
                                                    }));                
}    

This is a little different. First, we're taking the enumerable list of random numbers and turning it into an observable list so the values will be pushed to us. This is just by way of demonstration; we could have just as easily iterated the list with a foreach loop instead. What's interesting here is that I can limit how many I grab with the Take(20) extension. I subscribe like I do to any other observable list, and when I receive the next number pair, I turn around and subscribe to the calculator service to add the numbers for me. Instead of publishing the result to the subject, I'm handling it myself. I observe on the dispatcher thread, then add a text block with the addition statement to the stack panel.

Go ahead and run the application, click the button, and you'll receive output that looks like this:

Observations (Pardon the Pun)

If you run this and click the go button, you might notice something interesting. No matter how many times you click, you get the full sequence of numbers. In other words, if I let 5 numbers come back, then click go, I'll receive a sequence of 35 numbers, not 25.

Even more interesting is if you click the second go button, wait until most (but not all) of the 20 numbers return, then click the first go button. You'll see the screen clear, but you'll receive a few sequences of added numbers before the fibonnacci sequence starts.

What's Going On?

But we disposed of the subscription, right? Not exactly. In this implementation, we always getting the same service subscription. The subscription we cancel is the outer observation. To better understand this, load up a tool like Fiddler and watch the service calls. In the first example, the call is made, there is a delay, it returns, and then the next call is made.

In the second example, however, almost all of the calls are made almost all at once. They return at different rates due to the delays on the server. So, when you start a new sequence, you subscribe to the same service and therefore get to watch the results return that hadn't made it back from the initial calls.

This is important to understand as you are building your extensions, because in some cases you might want a new observable sequence, while in others it makes sense to keep the existing one. It depends on your needs and the desired behavior.

Hopefully this will help open some new doors of understanding about Reactive Extensions!

Click here to download the source code for this article

Jeremy Likness

2 comments:

  1. Nice post. A couple comments:
    1) Generally the use of Subjects is discouraged. I think in this case, you can use Observable.Create instead.
    2) Regarding your issue unwiring the subscriptions, .Subscribe returns an IDisposable object. If you hold onto that, you can dispose it to unwire the subscription and roll back the event stack correctly.

    ReplyDelete
  2. Appreciate the feedback. Why are Subjects discouraged, out of curiosity? I'm obviously learning as I go and that's an interesting observation. Would like to understand more - any specific reasources you can point me to? Thanks!

    ReplyDelete