martes, julio 08, 2014

RxJava + Java8 + Java EE 7 + Arquillian = Bliss



Microservices are an architectural style where each service is implemented as an independent system. They can use their own persistence system (although it is not mandatory), deployment, language, ...

Because a system is composed by more than one service, each service will communicate with other services, typically using a lightweight protocol like HTTP and following a Restful Web approach. You can read more about microservices here: http://martinfowler.com/articles/microservices.html

Let's see a really simple example. Suppose we have a booking shop where users can navigate through a catalog and when they find a book which they want to see more information, they click on the isbn, and then a new screen is opened with detailed information of the book and comments about it written by readers.

This system may be composed by two services:
  • One service to get book details. They could be retrieved from any legacy system like a RDBMS.
  • One service to get all comments written in a book and in this case that information could be stored in a document base database.
The problem here is that for each request that a user does we need to open two connections, one for each service. Of course we need a way do that jobs in parallel to improve the performance. And here lies one problem, how we can deal with this asynchronous requests? The first idea is to use Future class. For two services may be good but if you require four or five services the code will become more and more complex, or for example you may need to get data from one service and using it in another services or adapt the result of one service to be the input of another one. So there is a cost of management of threads and synchronization.

It will be awesome to have some way to deal with this problem in a clean and easy way. And this is exactly what RxJava does. RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

With RxJava instead of pulling data from an structure, data is pushed to it which reacts with an event that are listened by a subscriber and acts accordantly. You can find more information in https://github.com/Netflix/RxJava.


So in this case what we are going to implement is the example described here using RxJava, Java EE 7, Java 8 and Arquillian for testing.

This post assumes you know how to write Rest services using Java EE specification.

So let's start with two services:




And finally it is time to create a third facade service which receives communication from the client, sends to both services in parallel a request and finally zip both responses. zip is the process of combining sets of items emitted together via a specified function and sent it back to client (not to be confused with compression :)).


Basically we create a new service. In this case URLs of both services we are going to connect are hardcoded. This is done for academic purpose but in production-like code you will inject it from a producer class or from properties file or any system you will use for this purpose. Then we create javax.ws.rs.client.WebTarget for consuming Restful Web Service.

After that we need to implement the bookAndComment method using RxJava API.

The main class used in RxJava is rx.Observable. This class is an observable as his name suggest and it is the responsible of firing events for pushing objects. By default events are synchronous and it is responsible of developer to make them asynchronous.

So we need one asynchronous observable instance for each service:

Basically we create an Observable that will execute the specified function when a Subscriber subscribes to it. The function is created using a lambda expression to avoid creating nested inner classes. In this case we are returning a JsonObject as a result of calling the bookinfo service. The result is passed to onNext method so subscribers can receive the result. Because we want to execute this logic asynchronously, the code is wrapped inside a Runnable block.

Also it is required to call the onCompleted method when all logic is done.

Notice that because we want to make observable asynchronous apart of creating a Runnable, we are using an Executor to run the logic in separate thread. One of the great additions in Java EE 7 is a managed way to create threads inside a container. In this case we are using ManagedExecutorService provided by container to span a task asynchronously in a different thread of the current one.

Similar to previous but instead of getting book info we are getting an array of comments.

Then we need to create an observable in charge of zipping both responses when both of them are available. And this is done by using zip method on Observable class which receives two Observables and applies a function to combine the result of both of them. In this case a lambda expression that creates a new json object appending both responses.

Let's take a look of previous service. We are using one of the new additions in Java EE which is Jax-Rs 2.0 asynchronous REST endpoints by using @Suspended annotation. Basically what we are doing is freeing server resources and generating the response when it is available using the resume method.

And finally a test. We are using Wildfly 8.1 as Java EE 7 server and Arquillian. Because each service may be deployed in different server, we are going to deploy each service in different war but inside same server.

So in this case we are going to deploy three war files which is totally easy to do it in Arquillian.

In this case client will request all information from a book. In server part zip  method will wait until book and comments are retrieved in parallel and then will combine both responses to a single object and sent back to client.

This is a very simple example of RxJava. In fact in this case we have only seen how to use zip method, but there are many more methods provided by RxJava that are so useful as well like take(), map(), merge(), ... (https://github.com/Netflix/RxJava/wiki/Alphabetical-List-of-Observable-Operators)

Moreover in this example we have seen only an example of connecting to two services and retrieving information in parallel, and you may wonder why not to use Future class. It is totally fine to use Future and Callbacks in this example but probably in your real life your logic won't be as easy as zipping two services. Maybe you will have more services, maybe you will need to get information from one service and then for each result open a new connection. As you can see you may start with two Future instances but finishing with a bunch of Future.get() methods, timeouts, ... So it is in these situations where RxJava really simplify the development of the application.

Furthermore we have seen how to use some of the new additions of Java EE 7 like how to develop an asynchronous Restful service with Jax-Rs.

In this post we have learnt how to deal with the interconnection between services andhow to make them scalable and less resource consume. But we have not talked about what's happening when one of these services fails. What's happening with the callers? Do we have a way to manage it? Is there a way to not spent resources when one of the service is not available? We will touch this in next post talking about fault tolerance.

We keep learning,
Alex.
Bon dia, bon dia! Bon dia al dematí! Fem fora la mandra I saltem corrents del llit. (Bon Dia! - Dàmaris Gelabert)
Music: https://www.youtube.com/watch?v=BF7w-xJUlwM

2 comentarios:

  1. great information for java learners & working professionals...

    Tableau Training

    ResponderEliminar
  2. Oh my goodness! an amazing article dude. Thank you However I am experiencing issue with ur rss. Don’t know why Unable to subscribe to it. Is there anyone getting identical rss problem? Anyone who knows kindly respond. Thnkx

    Kiva.org
    Information
    Click Here
    Visit Web

    ResponderEliminar