Thursday, 2 January 2014

Publish and Subscribe with Hazelcast

A few weeks ago I wrote a blog on getting started with Hazelcast describing how ludicrously simple it is to create distributed maps, lists and queues. At the time I mentioned that Hazelcast does quite a few other things besides. This blog takes a quick look at another of Hazelcast’s features: its broadcast messaging system based on the Publish/Subscribe pattern. This takes the usual format where by the message sender app publishes messages on a certain topic. The messages aren't directed at any particular client, but can be read by any client that registers an interest in the topic.


The obvious scenario for publish and subscribe comes from the world of high finance and market makers. A market maker both buys and sells financial instruments such as stocks and competes for business by advertising both a buy and sell prices in a, usually electronic, market place. To implement a very simple market maker scenario using Hazelcast we need three classes: a StockPrice bean, a MarketMaker and a Client.

The following code has been added to my existing Hazelcast project that’s available on Github. There are no additional POM dependencies to worry about.

public class StockPrice implements Serializable {

 
private static final long serialVersionUID = 1L;

 
private final BigDecimal bid;

 
private final BigDecimal ask;

 
private final String code;

 
private final String description;

 
private final long timestamp;

 
/**
   * Create a StockPrice for the given stock at a given moment
   */
 
public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description,
     
long timestamp) {
   
super();
   
this.bid = bid;
   
this.ask = ask;
   
this.code = code;
   
this.description = description;
   
this.timestamp = timestamp;
 
}

 
public BigDecimal getBid() {
   
return bid;
 
}

 
public BigDecimal getAsk() {
   
return ask;
 
}

 
public String getCode() {
   
return code;
 
}

 
public String getDescription() {
   
return description;
 
}

 
public long getTimestamp() {
   
return timestamp;
 
}

 
@Override
 
public String toString() {

   
StringBuilder sb = new StringBuilder("Stock - ");
    sb.append
(code);
    sb.append
(" - ");
    sb.append
(description);
    sb.append
(" - ");
    sb.append
(description);
    sb.append
(" - Bid: ");
    sb.append
(bid);
    sb.append
(" - Ask: ");
    sb.append
(ask);
    sb.append
(" - ");
    SimpleDateFormat df =
new SimpleDateFormat("HH:MM:SS");
    sb.append
(df.format(new Date(timestamp)));
   
return sb.toString();
 
}
}

The StockPrice bean, with all the usual getters and setters, models a stock’s ask and bid price (sell and buy in normal language) at any given time and the MarketMaker class publishes these beans using Hazelcast.

Normally a market maker will publish prices in more than one financial instrument; however, for simplicity, in this demo the MarketMaker only publishes a single price.

public class MarketMaker implements Runnable {

 
private static Random random = new Random();

 
private final String stockCode;

 
private final String description;

 
private final ITopic<StockPrice> topic;

 
private volatile boolean running;

 
public MarketMaker(String topicName, String stockCode, String description) {
   
this.stockCode = stockCode;
   
this.description = description;
   
this.topic = createTopic(topicName);
    running =
true;
 
}

 
@VisibleForTesting
 
ITopic<StockPrice> createTopic(String topicName) {
   
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
   
return hzInstance.getTopic(topicName);
 
}

 
public void publishPrices() {

   
Thread thread = new Thread(this);
    thread.start
();
 
}

 
@Override
 
public void run() {

   
do {
     
publish();
      sleep
();
   
} while (running);
 
}

 
private void publish() {

   
StockPrice price = createStockPrice();
    System.out.println
(price.toString());
    topic.publish
(price);
 
}

 
@VisibleForTesting
 
StockPrice createStockPrice() {

   
double price = createPrice();
    DecimalFormat df =
new DecimalFormat("#.##");

    BigDecimal bid =
new BigDecimal(df.format(price - variance(price)));
    BigDecimal ask =
new BigDecimal(df.format(price + variance(price)));

    StockPrice stockPrice =
new StockPrice(bid, ask, stockCode, description,
        System.currentTimeMillis
());
   
return stockPrice;
 
}

 
private double createPrice() {

   
int val = random.nextInt(2010 - 1520) + 1520;
   
double retVal = (double) val / 100;
   
return retVal;
 
}

 
private double variance(double price) {
   
return (price * 0.01);
 
}

 
private void sleep() {
   
try {
     
TimeUnit.SECONDS.sleep(2);
   
} catch (InterruptedException e) {
     
e.printStackTrace();
   
}
  }

 
public void stop() {
   
running = false;
 
}

 
public static void main(String[] args) throws InterruptedException {

   
MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom");
    MarketMaker cbry =
new MarketMaker("STOCKS", "CBRY.L", "Cadburys");
    MarketMaker bp =
new MarketMaker("STOCKS", "BP.L", "British Petrolium");

    bt.publishPrices
();
    cbry.publishPrices
();
    bp.publishPrices
();

 
}

}

As usual, setting up Hazelcast is fairly straight forward and most of the code in the MarketMaker class above has nothing to do with Hazelcast. The class is split into two part: construction and publishing prices. The constructor takes three arguments, which it stores away for later. It also creates a Hazelcast instance and registers a simple topic called "STOCKS" via the private createTopic() method. As you might expect, creating a Hazelcast instance and registering a topic takes two lines of code as shown below:

  ITopic<StockPrice> createTopic(String topicName) {
   
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
   
return hzInstance.getTopic(topicName);
 
}

The rest of the class runs the price publishing mechanism using a thread to call the MarketMaker's run() method. This method generates a random bid, ask price for the associated stock code and publishes it using Hazelcast. Publishing is achieved using the following single line of code:

    topic.publish(price);

The final part of the MarketMaker class is the main() method and all this does is to create several MarketMaker instances and sets them running.

Now that Hazelcast knows about our ever changing stock prices, the next thing to do is to sort out the client code.

public class Client implements MessageListener<StockPrice> {

 
public Client(String topicName) {
   
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    ITopic<StockPrice> topic = hzInstance.getTopic
(topicName);
    topic.addMessageListener
(this);
 
}

 
/**
   *
@see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message)
   */
 
@Override
 
public void onMessage(Message<StockPrice> arg0) {
   
System.out.println("Received: " + arg0.getMessageObject().toString());
 
}

 
public static void main(String[] args) {

   
new Client("STOCKS");
 
}

}

As with any messaging system, the message sender code has to know both who to call and what to call. The "what to call" is achieved by the client creating an Hazelcast instance and registering an interest in the "STOCKS" topic, in the same way as the publisher as shown below:

    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    ITopic<StockPrice> topic = hzInstance.getTopic
(topicName);
    topic.addMessageListener
(this);

The "what to call" is achieved by the client implementing Hazelcast's MessageListener interface and its single method onMessage()

  @Override
 
public void onMessage(Message<StockPrice> arg0) {
   
System.out.println("Received: " + arg0.getMessageObject().toString());
 
}

The final part of the client code is its main() method that creates a client instance.

The final thing to do is to run the code. For this I’ve simply put all the necessary JAR files in a single directory and there’s only two to consider: hazel cast-3.1.jar and guava-13.0.1.jar.


Once that was done I changed to the project’s classes directory:

cd /Users/Roger/git/captaindebug/hazelcast/target/classes

…and fired up the publisher

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker

…and then the client.

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client

Of course, if you’re running this on your machine using this rough and ready technique, then remember to replace /Users/Roger/tmp/mm with the path to the place where you’ve put your copies of these JAR files.

If you run a MarketMaker publisher in one terminal and a couple of clients in two other terminals, then you’ll get something like this, where you can see the prices being published and the clients receiving updates.


One thing to note about Hazelcast is that a ‘cluster' refers to a cluster of Hazelcast instances, rather than a cluster of JVMs. This isn’t obvious until you ask for more than one Hazelcast instance per application. When additional clients join the cluster you’ll see something like this:

Members [5] {
Member [192.168.0.7]:5701
Member [192.168.0.7]:5702
Member [192.168.0.7]:5703
Member [192.168.0.7]:5704 this
Member [192.168.0.7]:5705
}

In the above log, there are two listener entries, one for each client process, and three publisher entries, one for each of the MarketMaker instances started in the MarketMaker’s main() method.

The thing to consider here is whether or not it’s good practice to create a Hazelcast instance per object instantiation (as I’ve done in the sample code) or is it better to have a single static Hazelcast instance in your code. I’m not sure of the answer to this so if there are any Hazelcast gurus reading this please let me know.

That’s it then: Hazelcast is happily running in publish and subscribe mode, but I’ve not covered all of Hazelcast’s features; perhaps more on those later...


This source code is available on Github: https://github.com/roghughe/captaindebug/tree/master/hazelcast


3 comments:

Christoph Engelbert said...

If you want to make this really high performing just use DataSerializable (better IdentifiedDataSerializable) instead of Serializable and write the data yourself into the stream. That way you can get max performance (up to 2-5x higher than Serializable).

Rob Schierholz said...

This is a nice article on how to use Hazelcast pub/sub. I did have one question, does anyone know of any bench marks and or how performance would compare to say a rabbit mq. We've used Hazelcast successfully as a distributed cache, however have stuck with the move traditional queuing mechanisms for this type of work. I'm just curious if anyone has pros and or cons.

Rob Schierholz said...

This is a nice article on how to use pub/sub within Hazelcast. I'm curious though, does anyone have any experience on how it compares from a performance perspective to a JMS and or AMQP mechansism such as active mq or rabbit mq?