loading...

October 13, 2018

Autobahn|Java – WAMP Library of Crossbar.io

This article describes how to implement a WAMP client using the library Autobahn|Java.

You have already learned how to set up a local Web Application Messaging Protocol (WAMP) router. This article would describe how to make the first step in programming a microservice in Java using the library Autobahn|Java and, thus, how to use a WAMP router.

Idea of Autobahn|Java

A WAMP network usually consists of a router and multiple clients. The clients are often microservices which are part of, at least, one bigger service. While being connected with the router, these microservices can communicate with each other. The article about WAMP describes the different ways of communicating via this protocol.

It is not relevant in which programming language the clients are being implemented, Autobahn|Java is the library of Crossbar.io to implement a client for a WAMP network specifically in Java. It is based on the framework Netty, which is used for asynchronous programming.

Connection to the Router

Firstly, an Executor (java.util.concurrent) and a Session (io.crossbar.autobahn.wamp) would be needed.

Executor executor = Executors.newSingleThreadExecutor();
Session session = new Session(executor);

Together with the URL and the realm of the WAMP router, Executor and Session have to be provided to the new Client (io.crossbar.autobahn.wamp). Eventually, the Client could connect to the router.

Client client = new Client(session, url, realm, executor);
try {
    client.connect().get();
} catch (Exception e) {
    e.printStackTrace();
}

After connecting and starting the Client, you could register remote procedures and call them and publish on and subscribe to topics. An example of a class realising only the connection could be found on my GitHub page.

CompletableFuture

Important for the Autobahn library are objects of the type CompletableFuture. Like the name suggests, they are objects which will be completed in the future. The first step would be telling the JVM from where it would get the value. Then, the programme will continue even though no value would be received. The method thenAccept() would define what should be done when the value is received. Using other methods such as whenComplete(), in place of thenAccept(), is also possible.

private void callExample(Session session, SessionDetails details) {
    int arg1 = 11, arg2 = 22;
    CompletableFuture<CallResult> completableFuture = session.call("com.felixseifert.procedure", arg1, arg2);
    completableFuture.thenAccept(callResult -> System.out.println("Result: " + callResult.results));
}

When a CompeletableFuture should be directly crated after the object obtains a connection to the router then it should be written in a separate method and added as a listener. A good idea is doing this in the constructor of the related object.

public TestController(Executor executor) {
    this.executor = executor;
    this.session = new Session(executor);
    session.addOnJoinListener(this::callExample);
    // Other methods could be added as listeners to be executed when the session joins the realm.
}

Register a Procedure

Registering a procedure works quite similarly. Nevertheless, you do not use the arguments to call but you have to register a handler method which is being invoked when the procedure gets called.

private void registerExample(Session session, SessionDetails details) {
    CompletableFuture<Registration> completableFuture =
            session.register("com.felixseifert.procedure", this::registerHandler);
    completableFuture.thenAccept(registration ->
            LOGGER.info("Procedure registered: " + registration.procedure));
}

Afterwards, the registered handler method has to also be implemented. This method returns what the whole procedure should return.

private int registerHandler(List<Object> args, InvocationDetails details) {
    return (int) args.get(0) + (int) args.get(1));
}

Subscribe to a Topic

Once subscribed to a topic, you could receive multiple publications on this topic. These publications are then managed by a separate handler method, which is assigned to this subscription (similar to registering a procedure).

CompletableFuture<Subscription> completableFuture = session.subscribe(com.felixseifert.topic, this::subscribeHandler);
completableFuture.thenAccept(subscription -> LOGGER.info("Subscribed to topic " + subscription.topic));

The handler method assigned to the subscription does not have to return anything because no message is sent back to the publisher.

private void subscribeHandler(List<Object> args) {
    LOGGER.info("Received via subscription: " + args.get(0));
}

Publish on a Topic

Publishing on a topic is quite straightforward. You just have to remember to pass the PublishOptions, along with the arguments, to the CompletableFuture. The first argument (“acknowledge”) could be either true or false. In this example, the second one (“excludeMe”) needs the value “false”. Otherwise, the subscriber might not receive the messages.

String args = "Hello Subscriber";
PublishOptions publishOptions = new PublishOptions(true, false);

CompletableFuture<Publication> completableFuture = session.publish(com.felixseifert.topic, publishOptions, args);
completableFuture.thenAccept(publication -> LOGGER.info("Published on " + com.felixseifert.topic));

How to Go On

The code snippets above are only simple examples for Pub/Sub and RPCs. These methods could be combined and services of different programming languages could communicate with each other. Furthermore, you could also implement authentication. For more information about this library, have a look on the GitHub page of Autobahn|Java.

You will find this example on my GitHub page, which also has a simplified version where everything is implemented in one class only.

Posted in Java, WAMPTaggs: