Reactive programming explained

Programmering C# Reactive Programming Asynchronous

Madeleine von Hausswolff

Systemutvecklare

This post aims to explain what reactive programming is and why, when and how to use it. I was inspired to write this after a weekend course by Dr. Venkat Subramaniam that I recently attended. Having a great teacher like Dr. Venkat explaining how to think about reactive programming was very useful and made me want to share what I had learnt.

By explaining what it is and how to use it, he placed it in the bigger picture of paradigms and showed how it relates to the concepts that we already familiar with in the development world.

 

Why does this need to be explained?

Reactive programming is not new. Most languages have Rx packages available. With all the benefits that it contains you could wonder why it is not more commonly used. I have not yet worked with a codebase that makes use of it, and I do not have the answer. I do know that one tricky part is that it requires a way of thinking that you might not be used to. I have used asynchronous programming with the async — await pattern in C#, but I am not used to actually thinking asynchronous.


I am very glad that a few years ago I tried to build a server side node.js application. At the time it was painful for me to handle the asynchronous nature of node.js, but now I can apply that way of thinking when learning reactive programming.

 

What is the problem that we need to solve with reactive programming?

To add reactive programming to an application is a way to achieve an architecture that is resilient and responsive. This creates a more pleasant user experience and makes it possible to scale up an application without it being overburdened by the heavier workload. The core concerns in reactive programming are four ideas brought together. These are:

  • Responsive — Fast feedback loop for the user to make him/her feel in power
  • Elastic — The application stays responsive no matter the workload. It can scale up.
  • Resilient — It deals with problems along the way and stays responsive in the face of failure.
  • Message Driven — changes are propagated through messages.

You can read more about the ideas behind reactive programming in “The Reactive Manifesto’s webpage”: reactivemanifesto.org Länk till annan webbplats.

 

Reactive programming explained by Dr. Venkat Subramaniam

Essentially, reactive programming is a paradigm oriented around data flows and the propagation of change. The parts that reactive programming consists of, as mentioned above, are not new and as far back as the 80s the concept of data flow was already in use. The new aspect is the way of bringing it all together.


Reactive programming is basically dealing with data flows. What’s new is what happens to the data when it flows and how it is transported back to the caller. Venkat illustrated this data flow with a couple of whiteboard pens put together creating a connection.


Venkat explains reactive programming as the next logical step from functional programming, or functional programming ++. By this, he means that if you compose multiple functions and add lazy loading you have what we refer to as reactive programming.

 

Illustrating Data flow

Venkat Subramaniam illustrating data flow


As a comparison, he suggests that lazy evaluation is to functional programming what polymorphism is to object oriented programming. Not saying that polymorphism is the only thing object-oriented programming provides, but it is the most essential thing about it. In correlation, lazy evaluation is the most essential thing for reactive programming. The lazy evaluation is the postponement of the decision of what is going to be executed, and that is good because then you don’t execute what is not needed and that saves a lot of effort for all parts involved.

 

So how does it work?

In the world of reacting programming the pipeline implements the Producer-Consumer Dataflow pattern. For the pipeline to become reactive we add the observer pattern, asynchronous calls with message callback and lazy evaluation. In reactive programming, the producer is an observable and the consumer is an observer.


Now I am going to explain how this works with an example. Since I am a .NET developer my example is written in C#. The concept however is applicable to and supported in many other languages. You can find a list of other language that also have reactive packages in the following page: http://reactivex.io/languages.html Länk till annan webbplats..


In my example I use the packages System.Reactive and System.Reactive.Linq from the .NET Rx library and these will need to be added to your solution to make it work. Let’s get started!

 

Create the pipeline

When you create a producer in C#, what you really create is an Observable sequence. This is a simple example using the Observer class factory method Create to create a custom observable sequence. To be able to listen to the sequence, an observer must be added. The factory method also creates an observer of the requested type. My example looks like this:

 

static void Main(string[] args)
{
    IObservable<CountObject> observableOfCountToTen =  
        Observable.Create<CountObject>(observer  
             => CountToTen(observer));
}

 

Now we have an observable with the type CountObject. The type is a class that I’ve created. It must be initialized with a start value and then you can make it count by calling the Increase method. It looks like this:

public class CountObject
{
    public CountObject(int startValue)
    {
        CountableInt = startValue;
    }
    public int CountableInt {get; private set;}
    public void Increase()
    {
        CountableInt++;
    }
    public override string ToString()
    {
        return $"Current value is: { CountableInt }";
    }
}

 

In our example above the generated observer is sent into a custom method created by me, called CountTo10 . It takes in an IObserver<CountObject> and returns an IDisposable. The method CountTo10 initializes an instance of CountObject and then starts a loop that increase the CountableInt of the object and then calls the observers OnNext() method with the instance of the CountObject as an argument. This is done 10 times until the CountableInt has the value 10. The method looks like this:

static IDisposable CountToTen(IObserver<CountObject> observer)
{
    var observableObject = new CountObject(0);
    while (observableObject.CountableInt < 10)
    {
        observableObject.Increase();
        observer.OnNext(observableObject);
    }
    return null;
}


Subscribe to the dataflow

Now we have created code that has some logic, but for this code to be executed there is something missing. This code will never be executed because nobody needs the result. For anything to happen somebody needs to subscribe for the information. We can make this happen by calling the Subscribe method. We add it to the main method with this line:

 

observableOfCountToTen.Subscribe(e => Console.WriteLine(e));

 

When this is done, the delegate gets executed using the custom method CountTo10 that creates an instance of the CountObject and sends it to the observers OnNext() method. Each time the object will be printed since this is what the code in this example does with all the values from the observer. Finally, the ten lines with the count from 1 to 10 will be written.


In this example, the sequence of data that flows through the pipeline is an instance of the CountObject. Before we subscribe to the pipeline, no data will flow throw it. All we have done is just set up the pipeline and the observable waits until somebody needs the execution of it. The observer’s method Subscribe could be called more than once. If we copy the subscribe line above, the ten lines would be printed one more time. It can be done as many times as you like. Like this, the power of execution is laid in the hands of the consumer.


In the world of reactive programming this is called a cold observer, because of its nature of doing nothing without subscribers.


Filtering the sequence

If we don’t want all the values that are in the observed stream, we could filter it on the way back to the caller. This is done by adding a filtering function to the stream before we receive it. Let’s say we only want to react on the even number in the stream. Then we filter out those before printing by adding a Where clause like this:

 

observableOfCountToTen.Where(e => e.CountableInt % 2 == 0)
.Subscribe(e => Console.WriteLine(e))


The result will be that only the numbers 2, 4, 6, 8 and 10 will be printed. We could add as many filters as we want before subscribing or use other filters as for example: Take, TakeWhile, Skip, SkipWhile. The printing will be done after filtering the stream so we can control what is printed. This is how we benefit form the lazy evaluation that is the nature of functional programming.

 

Non-blocking calls

You may have recognized that the OnNext method is used in another context. It is the IEnumerable interface that also has an OnNext method. The role it plays is the same, which is to move forward and send the next data to the caller. For example, when an IEnumerable list is looped through with the foreach method, then the OnNext is called for each step in the loop. The difference between these two methods with the same name, is that the enumerable list executes synchronous and the observer executes asynchronous. When an enumerable list is looped through, the calling code must wait for the execution to be done, but when an observables OnNext method is called, the thread is released straight away, since it is asynchronous. The caller can do other stuff while waiting for the data to be delivered and this is how the call is non-blocking. This enables your program to be responsive even though the data takes time to be processed.


In the IObserver Interface, compared to the IEnumerable, there are two extra methods: OnCompleted and OnError. If we use them, we benefit from having more than one thread streaming back. An observer has three pipelines back with different responsibilities. The task for OnNext is to stream the observed values, for OnCompleted it is to notify the caller that there are no more values to observe and for OnError it is to notify that something went wrong so no more data will be sent. Because of these methods, the caller can choose to continue with different actions depending on the type of information it gets. If we want to react on these different events, we must add the following lines to our example:

 

observableOfCountToTen.Subscribe(e => Console.WriteLine(e),
    err => Console.WriteLine(err.Message),
    () => Console.WriteLine(“The subscription is completed”));


We also need to modify the CountToTen method to add the call to OnComplete, when the job is done. We change the code to the following:

 

var observableObject = new CountObject(0);
while (observableObject.CountableInt < 10)
{
   observableObject.Increase();
   observer.OnNext(observableObject);
}
observer.OnCompleted();
return null;

 

If we run the program now. The same lines as before will be printed followed by the line: “The subscription is completed”. Since there are no errors in the program as is, we must add a bug to be able to test the OnError method. Let’s say that CountableInt doesn’t like to be set to 4 and has forbidden it like this:

public void Increase()
{
   CountableInt++;
   if (CountableInt == 4)
   {
       throw new Exception(“Number 4 is forbidden”);
   }
}


If we run the program now, the counting will never be completed. We will have three lines printed, after that the error message Number 4 is forbidden and after that nothing more. The subscription will be terminated before it completes.


These three different callbacks give us the power to control the result of the execution of the data. If there is an error, the program will not crash. Instead we can choose what we want to happen. Also, when there is no more data coming, we can choose what we want to happen. This is without having to block the thread and wait in a try catch block.

 

Reactive programming 2

Venkat Subramaniam during the weekend course engaged in explaining reactive programming.


Message driven

Now we have seen that reactive programming uses the observer pattern, is lazy and executes asynchronously. What about messages, where are they? Are they forgotten in the example above? The answer is no, it is just abstracted away. The messages here are the events for the OnNext, OnComplete and OnError that notifies the subscriber whenever one of them has happened. Therefore, it’s said that reactive programming pushes changes instead of pulling and is message driven.

 

Hot and cold observables

In the world of reactive programming two kind of observables exists, hot and cold. A cold observable is the one I’ve presented in the example above. A cold observable only produces something when someone has made a subscription. A hot observable produces all the time even if nobody is listening. You can make a cold observable hot by calling the method publish. To get information from it you then need call connect. It looks like this:

 

observableOfCountToTen.Publish().Connect()


The observable in my example is not so suited to be a hot observable, but this is how to do it.

 

Conclusion

I have tried to explain the basics of reactive programming in this post. There are other ways of using reactive programming utilizing other classes and methods in the Rx package. I’ve chosen this usage purely to be able to explain how it works. The aim is to make it easier for you to get into the mindset of reactive programming. Finally, I want to thank Dr. Venkat Subramaniam for inspiring me to learn more about reactive programming and write this post.

 

His passion of teaching and engagement in the subject made it so much easier for me to learn and explore this subject further. I hope I have succeeded in conveying the basic concepts to you, because there is a lot more to read and learn about this subject. I have used the following sources to further my own knowledge:

 

Introduction to Rx: http://introtorx.com Länk till annan webbplats./
Reactive manifesto: https://www.reactivemanifesto.org/ Länk till annan webbplats.
ReactiveX: http://reactivex.io/ Länk till annan webbplats.

 

Link to example code in github: https://github.com/madvonh/ReactiveExample Länk till annan webbplats.

Fler inspirerande inlägg du inte får missa