Monday, 11 March 2013

Producers and Consumers - Part 3 Poison Pills

A couple of weeks ago I wrote part 2 of a short series of blogs on the Producer Consumer pattern. This blog focused upon the need to close down my Teletype’s worker thread, fixing a bug in the original code from part 1 of the series.

The idea here is that the Teletype’s worker thread can be controlled by a command from the application’s main thread. This command tells the worker thread to shutdown thus allowing the app the gracefully shutdown as demonstrated by the code below:

  @Override
 
public void run() {

   
while (run) {

     
try {
       
Message message = queue.take();
        printHead.print
(message.toString());
        messageCount++;
     
} catch (InterruptedException e) {
       
printHead.print("Teletype closing down...");
     
}
    }
   
printHead.print("Teletype Off.");
 
}

 
public void destroy() {
   
run = false;
    thread.interrupt
();
 
}

In this sample, the main thread calls the destroy() method, which sets the run variable to false and interrupts the worker’s blocking call to queue.take().

However, there’s a problem with this idea in certain circumstances. For example, will suddenly terminating the consumer’s worker thread cause problems in other parts of the system? Will there be data loss as important messages in the queue don’t get processed? If the answer to these questions is ‘yes’ then there’s another approach you can take: use a Poison Pill.

Poison Pill is a rather melodramatic name for simply placing a certain, known, data item on the queue and when the consumer reads this item it closes down. Obviously, the poison pill has to be the last item placed on the queue or else the consumer will shut down prematurely.

This idea is great in simple systems with only one producer and consumer as shown below:


...but takes a little more thought when there are multiple producers with a single consumer as in my football match updates scenario:


...and could fall apart completely in the case of multiple produces and consumers:


... as ensuring that each consumer receives a poison pill at the right time and all the data in the queue gets processed could be quite tricky.

In this blog I’m updating my Teletype code to shut itself down once the two MatcherReporters have sent all their data. The first thing to do is to decide on the message that will act as a poison pill. In the snippet below you can see that I’ve inserted a message that contains the text “END OF FILE” at the end of the match update stream.

  <value>95:30 END OF FILE</value>
  <value>95:00 Final Score  Fulham 0 - 1 Man Utd</value>
  <value>94:59 Full time The referee signals the end of the game.</value> 

I’ve inserted one of these messages into each set of game data.

The next thing to do is to modify the Teletype code adding a check for the poison pill message:

public class Teletype implements Runnable {

 
private static final String POISON_PILL_MESSAGE = "END OF FILE";

 
private final BlockingQueue<Message> queue;

 
private final PrintHead printHead;

 
private final int matchesPlayed;

 
private volatile boolean run = true;

 
private int pillsRecieved;

 
public Teletype(PrintHead printHead, BlockingQueue<Message> queue, int matchesPlayed) {
   
this.queue = queue;
   
this.printHead = printHead;
   
this.matchesPlayed = matchesPlayed;
 
}

 
public void start() {

   
Thread thread = new Thread(this, "Studio Teletype");
    thread.start
();
    printHead.print
("Teletype Online.");
 
}

 
@Override
 
public void run() {

   
while (run) {

     
try {
       
Message message = queue.take();
        handleMessage
(message);
     
} catch (InterruptedException e) {
       
printHead.print("Teletype closing down...");
     
}
    }
   
printHead.print("Teletype Off.");
 
}

 
private void handleMessage(Message message) {
   
if (allGamesAreOver(message.getMessageText())) {
     
run = false;
   
} else {
     
printHead.print(message.toString());
   
}
  }

 
private boolean allGamesAreOver(String messageText) {

   
if (POISON_PILL_MESSAGE.equals(messageText)) {
     
pillsRecieved++;
   
}

   
return pillsRecieved == matchesPlayed ? true : false;
 
}

 
@VisibleForTesting
 
boolean isRunning() {
   
return run;
 
}
}

One of the most significant changes here is the addition of the matchesPlayed instance variable. This variable tells the Teletype how many MatchReporters there are supplying it with data. Ultimately this breaks the Producer Consumer pattern in that the consumer now knows about the rest of the system; however, it’s necessary because we need to ensure that the Teletype shuts down at the end of all the data. In a single producer/consumer one to one system this isn’t necessary.

The other big change in the Teletype code is to the run() loop. Once a message has been retrieved from the queue it’s passed to the new handleMessage(...) method. The handleMessage(...) method checks whether or not all the games it’s receiving data from are over by calling allGamesAreOver(...), which checks the message text against the poison pill string. If the message test is the poison pill string then the pillsRecieved counter is updated. If the pillsRecieved equals the matchesPlayed variable then all the all the games are over and allGamesAreOver(...) returns true. This sets the run instance variable to false and the worker thread’s run() method exits.

So that’s about it, the melodramatic Poison Pill pattern in a nutshell, next time Murder in the Red Barn.



The code for this sample is available on GitHub.



No comments: