You may wonder why this should be a problem… well to you, as a developer, it isn't really a problem at all it's only a little bit of sloppy programming but, to one of your operations guys it can make life unnecessarily difficult. The reason for this is that if you have too many badly behaved threads then typing Tomcat's shutdown.sh command will have very little effect and you have to savagely kill your web server by typing something like:
ps -ef | grep java
to get the pid and then
kill -9 <<pid>>
…and when you have a field of Tomcat web servers to restart all this extra kerfuffle that becomes a severe pain. When you type shutdown.sh you want Tomcat to stop.
In my last couple of blogs the badly behaved threads I created had the following run() methods with the first of these, shown below, being really badly behaved:
  @Override
  public void run() {
    while (true) {
      try {
        DeferredResult<Message> result = resultQueue.take();
        Message message = queue.take();
        result.setResult(message);
      } catch (InterruptedException e) {
        throw new UpdateException("Cannot get latest update. " + e.getMessage(), e);
      }
    }
  }In this code I've used an infinite while(true), which means that the thread will just keep running and never terminate.
  @Override
  public void run() {
    sleep(5); // Sleep to allow the reset of the app to load
    logger.info("The match has now started...");
    long now = System.currentTimeMillis();
    List<Message> matchUpdates = match.getUpdates();
    for (Message message : matchUpdates) {
      delayUntilNextUpdate(now, message.getTime());
      logger.info("Add message to queue: {}", message.getMessageText());
      queue.add(message);
    }
    start = true; // Game over, can restart
    logger.warn("GAME OVER");
  }The second example, above, is also pretty badly behaved. It'll keep taking messages from MatchUpdates list and adding them to the message queue at the appropriate moment. Their only saving grace is that they may throw an InterruptedException, which if handled correctly would cause thread termination; however, this cannot be guaranteed.
There's a quick fix for this, really… all you need to do is to ensure that any threads you create are daemon threads. The definition of a daemon thread is a thread that doesn't prevent the JVM from exiting when the program finishes but, the thread is still running. The usual example of a daemon thread is the JVM's garbage collection thread. To turn your threads into daemon threads you simply call:
    thread.setDaemon(true);...and when you type shutdown.sh then, WHAM, all your threads will disappear. There is, however, a problem with this. What if one of your daemons threads was doing something important and chopping it down in its prime resulted in the loss of some pretty important data?
What you need to do is to ensure that all your threads shutdown gracefully, completing any work they may be currently undertaking. The rest of this blog demonstrates a fix for these errant threads, gracefully co-ordinating their shutdown by using a ShutdownHook. According to the documentation, a "shutdown hook is simply an initialized but unstarted thread. When the virtual machine begins its shutdown sequence it will start all registered shutdown hooks in some unspecified order and let them run concurrently." So, after reading the last sentence you may have guessed that what you need to do is to create a thread, which has the responsibility of shutting down all your other threads and is passed to the JVM as a shutdown hook. All of which can be generically implemented in a couple of small classes and by performing some jiggery-pokery on your existing thread run() methods.
The two classes to create are a ShutdownService and a Hook. The Hook class, which I'll demonstrate first, is used to link the ShutdownService to your threads. The code for Hook is as follows:
public class Hook {
  private static final Logger logger = LoggerFactory.getLogger(Hook.class);
  private boolean keepRunning = true;
  private final Thread thread;
  Hook(Thread thread) {
    this.thread = thread;
  }
  /**
   * @return True if the daemon thread is to keep running
   */
  public boolean keepRunning() {
    return keepRunning;
  }
  /**
   * Tell the client daemon thread to shutdown and wait for it to close gracefully.
   */
  public void shutdown() {
    keepRunning = false;
    thread.interrupt();
    try {
      thread.join();
    } catch (InterruptedException e) {
      logger.error("Error shutting down thread with hook", e);
    }
  }
}The Hook contains two instance variables: keepRunning and thread. thread is a reference to the thread that this instance of Hook is responsible for shutting down, whilst keepRunning tells the thread to… keep running.
Hook has two public methods: keepRunning() and shutdown(). keepRunning() is called by the thread to figure out whether it should keep running, and shutdown() is called by the ShutdownService's shutdown hook thread to get your thread to shutdown. This is the most interesting of the two methods. Firstly it sets the keepRunning variable to false. It then calls thread.interrupt() to interrupt the thread forcing it to throw an InterruptedException. Lastly, it calls thread.join() and waits for the thread instance to shutdown.
Note that this technique relies on all your threads co-operating. If there's one badly behaved thread in the mix, then the whole thing could hang. To get around this problem add a timeout to thread.join(…).
@Service
public class ShutdownService {
  private static final Logger logger = LoggerFactory.getLogger(ShutdownService.class);
  private final List<Hook> hooks;
  public ShutdownService() {
    logger.debug("Creating shutdown service");
    hooks = new ArrayList<Hook>();
    createShutdownHook();
  }
  /**
   * Protected for testing
   */
  @VisibleForTesting
  protected void createShutdownHook() {
    ShutdownDaemonHook shutdownHook = new ShutdownDaemonHook();
    Runtime.getRuntime().addShutdownHook(shutdownHook);
  }
  protected class ShutdownDaemonHook extends Thread {
    /**
     * Loop and shutdown all the daemon threads using the hooks
     * 
     * @see java.lang.Thread#run()
     */
    @Override
    public void run() {
      logger.info("Running shutdown sync");
      for (Hook hook : hooks) {
        hook.shutdown();
      }
    }
  }
  /**
   * Create a new instance of the hook class
   */
  public Hook createHook(Thread thread) {
    thread.setDaemon(true);
    Hook retVal = new Hook(thread);
    hooks.add(retVal);
    return retVal;
  }
  @VisibleForTesting
  List<Hook> getHooks() {
    return hooks;
  }
}The ShutdownService is a Spring service that contains a list of Hook classes, and therefore by inference threads, that it is responsible for shutting down. It also contains an inner class ShutdownDaemonHook, which extends Thread. An instance of ShutdownDaemonHook is created during the construction of ShutdownService, which is then passed to the JVM as a shutdown hook by calling
    Runtime.getRuntime().addShutdownHook(shutdownHook);The ShutdownService has one public method: createHook(). The first thing that this class does is to ensure that any thread passed to it is converted into a daemon thread. It then creates a new Hook instance, passing in the thread as the argument, before finally both storing the result in a list and returning it to the caller.
The only thing left to do now is to integrate the ShutdownService into DeferredResultService and MatchReporter, the two classes that contain the badly behaved threads.
@Service("DeferredService")
public class DeferredResultService implements Runnable {
  private static final Logger logger = LoggerFactory.getLogger(DeferredResultService.class);
  private final BlockingQueue<DeferredResult<Message>> resultQueue = new LinkedBlockingQueue<>();
  private Thread thread;
  private volatile boolean start = true;
  @Autowired
  private ShutdownService shutdownService;
  private Hook hook;
  @Autowired
  @Qualifier("theQueue")
  private LinkedBlockingQueue<Message> queue;
  @Autowired
  @Qualifier("BillSkyes")
  private MatchReporter matchReporter;
  public void subscribe() {
    logger.info("Starting server");
    matchReporter.start();
    startThread();
  }
  private void startThread() {
    if (start) {
      synchronized (this) {
        if (start) {
          start = false;
          thread = new Thread(this, "Studio Teletype");
          hook = shutdownService.createHook(thread);
          thread.start();
        }
      }
    }
  }
  @Override
  public void run() {
    logger.info("DeferredResultService - Thread running");
    while (hook.keepRunning()) {
      try {
        DeferredResult<Message> result = resultQueue.take();
        Message message = queue.take();
        result.setResult(message);
      } catch (InterruptedException e) {
        System.out.println("Interrupted when waiting for latest update. " + e.getMessage());
      }
    }
    System.out.println("DeferredResultService - Thread ending");
  }
  public void getUpdate(DeferredResult<Message> result) {
    resultQueue.add(result);
  }
}The first change to this class was to autowire in the Shutdown service instance. The next thing to do is to use the ShutdownService to create an instance of Hook after the creation of the thread but before thread.start() is called:
          thread = new Thread(this, "Studio Teletype");
          hook = shutdownService.createHook(thread);
          thread.start();The final change is to replace while(true) with:
    while (hook.keepRunning()) {…telling the thread when to quit the while loop and shutdown.
You may have also noticed that there are a few System.out.println() calls thrown into the above code. There is a reason for this and it's because of the undetermined order in which the shutdown hook threads are executed. Remember that not only are your classes trying to shutdown gracefully, but other sub-systems and shutting down too. This means that my original code, which called logger.info(…) failed throwing the following exception: 
This is because the logger has already been unloaded when I try to call it; hence the failure.Again, as the documentation states: "Shutdown hooks run at a delicate time in the life cycle of a virtual machine and should therefore be coded defensively. They should, in particular, be written to be thread-safe and to avoid deadlocks insofar as possible. They should also not rely blindly upon services that may have registered their own shutdown hooks and therefore may themselves in the process of shutting down. Attempts to use other thread-based services such as the AWT event-dispatch thread, for example, may lead to deadlocks."
Exception in thread "Studio Teletype" java.lang.NoClassDefFoundError: org/apache/log4j/spi/ThrowableInformation at org.apache.log4j.spi.LoggingEvent.(LoggingEvent.java:159) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.log(Category.java:856) at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:382) at com.captaindebug.longpoll.service.DeferredResultService.run(DeferredResultService.java:75) at java.lang.Thread.run(Thread.java:722) Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.ThrowableInformation at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1714) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1559) ... 6 more 
This is because the logger has already been unloaded when I try to call it; hence the failure.Again, as the documentation states: "Shutdown hooks run at a delicate time in the life cycle of a virtual machine and should therefore be coded defensively. They should, in particular, be written to be thread-safe and to avoid deadlocks insofar as possible. They should also not rely blindly upon services that may have registered their own shutdown hooks and therefore may themselves in the process of shutting down. Attempts to use other thread-based services such as the AWT event-dispatch thread, for example, may lead to deadlocks."
The MatchReport class has some very similar modifications. The major difference is that the hook.keepRunning() code is inside the run() method's for loop.
public class MatchReporter implements Runnable {
  private static final Logger logger = LoggerFactory.getLogger(MatchReporter.class);
  private final Match match;
  private final Queue<Message> queue;
  private volatile boolean start = true;
  @Autowired
  private ShutdownService shutdownService;
  private Hook hook;
  public MatchReporter(Match theBigMatch, Queue<Message> queue) {
    this.match = theBigMatch;
    this.queue = queue;
  }
  /**
   * Called by Spring after loading the context. Will "kick off" the match...
   */
  public void start() {
    if (start) {
      synchronized (this) {
        if (start) {
          start = false;
          logger.info("Starting the Match Reporter...");
          String name = match.getName();
          Thread thread = new Thread(this, name);
          hook = shutdownService.createHook(thread);
          thread.start();
        }
      }
    } else {
      logger.warn("Game already in progress");
    }
  }
  /**
   * The main run loop
   */
  @Override
  public void run() {
    sleep(5); // Sleep to allow the reset of the app to load
    logger.info("The match has now started...");
    long now = System.currentTimeMillis();
    List<Message> matchUpdates = match.getUpdates();
    for (Message message : matchUpdates) {
      delayUntilNextUpdate(now, message.getTime());
      if (!hook.keepRunning()) {
        break;
      }
      logger.info("Add message to queue: {}", message.getMessageText());
      queue.add(message);
    }
    start = true; // Game over, can restart
    logger.warn("GAME OVER");
  }
  private void sleep(int deplay) {
    try {
      TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
      logger.info("Sleep interrupted...");
    }
  }
  private void delayUntilNextUpdate(long now, long messageTime) {
    while (System.currentTimeMillis() < now + messageTime) {
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        logger.info("MatchReporter Thread interrupted...");
      }
    }
  }
}The ultimate test of this code is to issue a Tomcat shutdown.sh command half way through the match update sequence. As the JVM terminates it'll call the shutdown hook from the ShutdownDaemonHook class. As this class's run() method executes it loops throughout the list of Hook instances telling them to close down their respective threads. If you tail -f your server's log file (in my case catalina.out, but your Tomcat maybe configured differently to mine), you'll see the trail of entries shutting your server shutdown gracefully.
The code that accompanies this blog is available on Github at: https://github.com/roghughe/captaindebug/tree/master/long-poll.

 
 
4 comments:
Great aritcle, but this hook works with other servlet engines (like Jetty for example) ?
Hi,
Thanks for the comment. This technique will work for any multi-threaded Java application, including Jetty.
Hello,
Well remembered, I've seen more than one application locking the app server on shutdown, typically when there was some batch(-ish) work to do.
I think you can remove the dependency between the workers (DeferredResultService and MatchReporter) and the Hook if they used Thread.interrupted or Thread.currentThread().isInterrupted as the termination condition.
Just as a quick note, it's also possible to do the association of threads and hooks in a ThreadFactory used by a ThreadPoolExecutor, which is injected in the worker instances so that they submit themselves to it, and then have the shutdown hook call shutdownNow() on the pool.
I understand that if the article used any of this it would probably get a bit too complicated and miss the central point, though. But I though I should just mention it.
Unknown,
I thought that I'd try you idea of using Thread.interrupted() or Thread.currentThread().isInterrupted() as it sounds rather plausible. Unfortunately, it doesn't work as the status of a threads interrupted flag after the thread.interrupt() call depends upon how the thread is blocking.
From the JavaDocs for Thread.interrupt(): "If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(),join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.
If this thread is blocked in an I/O operation upon an interruptible channel then the channel will be closed, the thread's interrupt status will be set, and the thread will receive a ClosedByInterruptException."
In my example, the thread will be blocking using thread.sleep(…) and therefore the flag will be cleared and any subsequent test of thread.isInterrupted() will return false. Therefore the idea will only work if you can guarantee that a thread is blocking on an I/O operation when you call thread.interrupt().
Post a comment