Monday, 18 February 2013

Producers and Consumers - Part 1

The Producer Consumer pattern is an ideal way of separating work that needs to be done from the execution of that work. As you might guess from its name the Producer Consumer pattern contains two major components, which are usually linked by a queue. This means that the separation of the work that needs doing from the execution of that work is achieved by the Producer placing items of work on the queue for later processing instead of dealing with them the moment they are identified. The Consumer is then free to remove the work item from the queue for processing at any time in the future. This decoupling means that Producers don't care how each item of work will be processed, how many consumers will be processing it or how many other producers there are. It's a fire and forget world as far as they're concerned. Likewise consumers don't need to know where the work item came from, who put it in the queue, and how many other producers and consumers there are. All they need to do is to grab some work from the queue and process it.

In the Java world, the Producer Consumer pattern is often based around some kind of blocking queue and there are several to choose from. These include ArrayBlockingQueue, LinkedBlockingQueue and PriorityBlockingQueue. Each have slightly different characteristics.



The diagram above shows a simple implementation using a single pair of producer consumer objects, whilst the diagram below demonstrates how this can be expanded to include multiple producers and consumers.


So, what about a practical scenario and some sample code? In the UK football is pretty popular (Soccer if you're reading this in the US) and every Saturday dozens of games are played throughout the land by a dedicated handful of professionals who sacrifice their afternoon in the pursuit of sporting excellence and large amounts of cash. A TV company sends a reporter to every game to feed live updates into a system and sent them back to the studio. On arriving at the studio the updates will be placed in a queue before being displayed on the screen by a Teletype. This scenario may have many producers, but only one or two consumers.


This scenario is modelled by today's sample code using the class diagram shown below.


The sample application, available on GitHub, is written as a Spring application because I want to separate the data from the code in a simple fashion, plus most readers of this blog already know about Spring, and it simplifies the scaffolding code that holds everything together. So far as Spring goes there are two Spring config files, matches.xml contains the match data whilst context.xml, shown below, contains a the Spring beans.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
  xmlns:p="http://www.springframework.org/schema/p"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:context="http://www.springframework.org/schema/context"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd" 
     default-init-method="start" 
  default-destroy-method="destroy">
 
  <import resource="matches.xml" />
 
  <bean id="theQueue" class="java.util.concurrent.LinkedBlockingQueue"/>
 
  <bean id="BillSkyes" class="com.captaindebug.producerconsumer.MatchReporter">
   <constructor-arg ref="match1"/>
   <constructor-arg ref="theQueue"/>
  </bean>
 
  <bean id="JohnSmith" class="com.captaindebug.producerconsumer.MatchReporter">
   <constructor-arg ref="match2"/>
   <constructor-arg ref="theQueue"/>
  </bean>
 
  <bean id="printHead" class="com.captaindebug.producerconsumer.PrintHead"/>
 
  <bean id="StudioTeletype" class="com.captaindebug.producerconsumer.original.Teletype">
   <constructor-arg ref="printHead"/>
   <constructor-arg ref="theQueue"/>
  </bean>

</beans>

The first task in designing any message based system is knowing the mechanism used to send your messages. In this case I've chosen a simple LinkedBlockingQueue and defined it in my Spring config.

<bean id="theQueue" class="java.util.concurrent.LinkedBlockingQueue"/>

The second thing to do is to define what it is that you're sending and I'm sending in-play updates about the big game as demonstrated in the code below.

public class Message implements Comparable<Message> {

 
private final String name;
 
private final long time;
 
private final String matchTime;
 
private final String messageText;

 
public Message(String name, long time, String messageText, String matchTime) {
   
this.name = name;
   
this.time = time;
   
this.messageText = messageText;
   
this.matchTime = matchTime;
 
}

 
/**
   *
@see java.lang.Comparable#compareTo(java.lang.Object)
   *
   *
@return a negative integer, zero, or a positive integer as this object is
   *         less than, equal to, or greater than the specified object
   */
 
@Override
 
public int compareTo(Message compareTime) {

   
int retVal = (int) (time - compareTime.time);

   
return retVal;
 
}

 
@Override
 
public String toString() {
   
return matchTime + " - " + name + " - " + messageText;
 
}

 
public String getName() {
   
return name;
 
}

 
public String getMessageText() {
   
return messageText;
 
}

 
public long getTime() {
   
return time;
 
}

 
public String getMatchTime() {
   
return matchTime;
 
}

}

This is a simple bean so there's not too much point in dwelling on it; however, during a match the will be a large number of these objects and they'll need organising and sorting, which is managed by the Match class below.

public class Match {

 
private final String name;

 
private final List<Message> updates;

 
public Match(String name, List<String> matchInfo) {

   
this.name = name;
   
this.updates = new ArrayList<Message>();
    createUpdateList
(matchInfo);
 
}

 
private void createUpdateList(List<String> matchInfo) {

   
createMessageList(matchInfo);
    Collections.sort
(updates);
 
}

 
private void createMessageList(List<String> matchInfo) {

   
for (String rawMessage : matchInfo) {

     
final String timeString = getTime(rawMessage);
     
final long time = parseTime(timeString);
     
final String messageString = getMessage(rawMessage);
      Message message =
new Message(name, time, messageString, timeString);
      updates.add
(message);
   
}
  }

 
private String getTime(String rawMessage) {
   
int index = rawMessage.indexOf(' ');
    String retVal = rawMessage.substring
(0, index);
   
return retVal;
 
}

 
/**
   * This may look weird, but the algorithm converts minutes to millis. eg 55:30 becomes
   * 55500mS
   */
 
private long parseTime(String timeString) {
   
String[] split = timeString.split(":");
   
long minutes = (Long.valueOf(split[0]) * 1000);
   
long seconds = (Long.valueOf(split[1])) * 1000 / 60;
   
long time = minutes + seconds;
   
return time;
 
}

 
private String getMessage(String rawMessage) {

   
int index = rawMessage.indexOf(' ');
    String retVal = rawMessage.substring
(index + 1);
   
return retVal;
 
}

 
public String getName() {
   
return name;
 
}

 
public List<Message> getUpdates() {
   
return Collections.unmodifiableList(updates);
 
}
}

This class takes a list of raw message strings as a constructor arg. Here's a snippet from matches.xml demonstrating the format of the messages:

<value>95:21 Full time The referee blows his whistle to end the game. </value>
<value>94:06 Unfair challenge on Laurent Koscielny by Kenwyne Jones results in a free kick. Wojciech Szczesny takes the free kick. </value>

...where the mm:ss component is the time of update.

The Match class needs to load and sort the updates and this is simply achieved by creating a bunch of Message objects and then sorting them using Collections.sort() as as the Message class implements the Comparable interface.

The next slice of code is the publisher and in this scenario it comes in the form of a MatchReporter. Each MatchReporter is allocated a Match and told about the queue via its constructor args. The MatchReporter is started by Spring calling its start() method as described in my blog on Three Spring Bean Lifecycle Techniques. Calling start() creates a new thread that allows the MatchReporter to check message times so that it can place them on the queue at the appropriate moment.

public class MatchReporter implements Runnable {

 
private final Match match;

 
private final Queue<Message> queue;

 
public MatchReporter(Match theBigMatch, Queue<Message> queue) {
   
this.match = theBigMatch;
   
this.queue = queue;
 
}

 
/**
   * Called by Spring after loading the context. Will "kick off" the match...
   */
 
public void start() {

   
String name = match.getName();
    Thread thread =
new Thread(this, name);

    thread.start
();
 
}

 
/**
   * The main run loop
   */
 
@Override
 
public void run() {

   
long now = System.currentTimeMillis();
    List<Message> matchUpdates = match.getUpdates
();

   
for (Message message : matchUpdates) {

     
delayUntilNextUpdate(now, message.getTime());
      queue.add
(message);
   
}
  }

 
private void delayUntilNextUpdate(long now, long messageTime) {

   
while (System.currentTimeMillis() < now + messageTime) {

     
try {
       
Thread.sleep(100);
     
} catch (InterruptedException e) {
       
e.printStackTrace();
     
}
    }
  }

}

Note that in this example I've converted minutes and seconds in to milliseconds so that the app runs in a reasonable amount of time. For example, 55:30 becomes 55500mS.

Having written the publisher code, the next thing to do is to sort out the consumer. In this example, the match updates are consumed by the Teletype (For those of you who don't know a teletype is take a look at Google. In the old days a TV camera used to be focused on a Teletype to bring viewers the latest scores).


The Teletype has the job of reading any messages on the queue and displaying them on the screen.

public class Teletype implements Runnable {

 
private final BlockingQueue<Message> queue;

 
private final PrintHead printHead;

 
public Teletype(PrintHead printHead, BlockingQueue<Message> queue) {
   
this.queue = queue;
   
this.printHead = printHead;
 
}

 
public void start() {

   
Thread thread = new Thread(this, "Studio Teletype");
    thread.start
();
 
}

 
@Override
 
public void run() {

   
while (true) {

     
try {
       
Message message = queue.take();
        printHead.print
(message.toString());
     
} catch (InterruptedException e) {
       
// TODO add some real error handling here
       
printHead.print("Teletype error - try switching it off and on.");
     
}
    }

  }

 
public void destroy() {
   
// Blank TODO...
 
}

}

The Teletype code is much the sames as the MatchReporter code. Again I'm using Spring to start the ball rolling via Teletype's start() method and again it creates a new thread to carry out its work. The difference here is that queue.take() is a blocking call meaning that program execution will suspend at this point until there's at least one update on the queue. When a message is available queue.take() will whip it from the queue and it'll then be displayed on the screen using the PrintHead class. Once the message has been printed the run loop goes back to queue.take() for the next message where it'll block again until one appears on the queue.

The code for this sample is available on Github and the eagle-eyed will have spotted that the tests in the TeletypeTest class have been disabled. This is because, although the Teletype code works it contains a couple of neat little flaws in that there's no way of shutting it down and that it's not particularly testable. As a developer you may not care too much about not being able to close your app down, but as an ops guy you do as starting and stopping stuff is pretty fundamental. In terms of the producer consumer patern there a few approaches you could take to rectify these problems, but more on that later...



The code for this sample is available on GitHub.

No comments: