Task Cancellation

An activity is cancellable if external code can move it to completion before its normal completion. There are a number of reasons why you might want to cancel an activity:

User-requested cancellation. The user clicked on the "cancel" button in a GUI application, or requested cancellation through a management interface such as JMX (Java Management Extensions).

Time-limited activities. An application searches a problem space for a finite amount of time and chooses the best solution found within that time. When the timer expires, any tasks still searching are cancelled.

Application events. An application searches a problem space by decomposing it so that different tasks search different regions of the problem space. When one task finds a solution, all other tasks still searching are cancelled.

Errors. A web crawler searches for relevant pages, storing pages or summary data to disk. When a crawler task encounters an error (for example, the disk is full), other crawling tasks are cancelled, possibly recording their current state so that they can be restarted later.

Shutdown. When an application or service is shut down, something must be done about work that is currently being processed or queued for processing. In a graceful shutdown, tasks currently in progress might be allowed to complete; in a more immediate shutdown, currently executing tasks might be cancelled.

There is no safe way to preemptively stop a thread in Java, and therefore no safe way to preemptively stop a task. There are only cooperative mechanisms, by which the task and the code requesting cancellation follow an agreed-upon protocol.

One such cooperative mechanism is setting a "cancellation requested" flag that the task checks periodically; if it finds the flag set, the task terminates early. PrimeGenerator in Listing 7.1, which enumerates prime numbers until it is cancelled, illustrates this technique. The cancel method sets the cancelled flag, and the main loop polls this flag before searching for the next prime number. (For this to work reliably, cancelled must be volatile.)

Listing 7.2 shows a sample use of this class that lets the prime generator run for one second before cancelling it. The generator won't necessarily stop after exactly one second, since there may be some delay between the time that cancellation is requested and the time that the run loop next checks for cancellation. The cancel method is called from a finally block to ensure that the prime generator is cancelled even if the the call to sleep is interrupted. If cancel were not called, the prime-seeking thread would run forever, consuming CPU cycles and preventing the JVM from exiting.

A task that wants to be cancellable must have a cancellation policy that specifies the "how", "when", and "what" of cancellationhow other code can request cancellation, when the task checks whether cancellation has been requested, and what actions the task takes in response to a cancellation request.

Consider the real-world example of stopping payment on a check. Banks have rules about how to submit a stop-payment request, what responsiveness guarantees it makes in processing such requests, and what procedures it follows when payment is actually stopped (such as notifying the other bank involved in the transaction and assessing a fee against the payor's account). Taken together, these procedures and guarantees comprise the cancellation policy for check payment.

Listing 7.1. Using a Volatile Field to Hold Cancellation State.

@ThreadSafe public class PrimeGenerator implements Runnable { @GuardedBy("this") private final List primes = new ArrayList(); private volatile boolean cancelled; public void run() { BigInteger p = BigInteger.ONE; while (!cancelled ) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public void cancel() { cancelled = true; } public synchronized List get() { return new ArrayList(primes); } }

Listing 7.2. Generating a Second's Worth of Prime Numbers.

List aSecondOfPrimes() throws InterruptedException { PrimeGenerator generator = new PrimeGenerator(); new Thread(generator).start(); try { SECONDS.sleep(1); } finally { generator.cancel(); } return generator.get(); }

PrimeGenerator uses a simple cancellation policy: client code requests cancellation by calling cancel, PrimeGenerator checks for cancellation once per prime found and exits when it detects cancellation has been requested.

7.1.1. Interruption

The cancellation mechanism in PrimeGenerator will eventually cause the primeseeking task to exit, but it might take a while. If, however, a task that uses this approach calls a blocking method such as BlockingQueue.put, we could have a more serious problemthe task might never check the cancellation flag and therefore might never terminate.

BrokenPrimeProducer in Listing 7.3 illustrates this problem. The producer thread generates primes and places them on a blocking queue. If the producer gets ahead of the consumer, the queue will fill up and put will block. What happens if the consumer tries to cancel the producer task while it is blocked in put? It can call cancel which will set the cancelled flagbut the producer will never check the flag because it will never emerge from the blocking put (because the consumer has stopped retrieving primes from the queue).

As we hinted in Chapter 5, certain blocking library methods support interruption. Thread interruption is a cooperative mechanism for a thread to signal another thread that it should, at its convenience and if it feels like it, stop what it is doing and do something else.

There is nothing in the API or language specification that ties interruption to any specific cancellation semantics, but in practice, using interruption for anything but cancellation is fragile and difficult to sustain in larger applications.

Each thread has a boolean interrupted status; interrupting a thread sets its interrupted status to true. Thread contains methods for interrupting a thread and querying the interrupted status of a thread, as shown in Listing 7.4. The interrupt method interrupts the target thread, and isInterrupted returns the interrupted status of the target thread. The poorly named static interrupted method clears the interrupted status of the current thread and returns its previous value; this is the only way to clear the interrupted status.

Blocking library methods like Thread.sleep and Object.wait try to detect when a thread has been interrupted and return early. They respond to interruption by clearing the interrupted status and throwing InterruptedException, indicating that the blocking operation completed early due to interruption. The JVM makes no guarantees on how quickly a blocking method will detect interruption, but in practice this happens reasonably quickly.

Listing 7.3. Unreliable Cancellation that can Leave Producers Stuck in a Blocking Operation. Don't Do this.

class BrokenPrimeProducer extends Thread { private final BlockingQueue queue; private volatile boolean cancelled = false; BrokenPrimeProducer(BlockingQueue queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; while (!cancelled) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { } } public void cancel() { cancelled = true; } } void consumePrimes() throws InterruptedException { BlockingQueue primes = ...; BrokenPrimeProducer producer = new BrokenPrimeProducer(primes); producer.start(); try { while (needMorePrimes()) consume(primes.take()); } finally { producer.cancel(); } }

Listing 7.4. Interruption Methods in Thread.

public class Thread { public void interrupt() { ... } public boolean isInterrupted() { ... } public static boolean interrupted() { ... } ... }

If a thread is interrupted when it is not blocked, its interrupted status is set, and it is up to the activity being cancelled to poll the interrupted status to detect interruption. In this way interruption is "sticky"if it doesn't trigger an InterruptedException, evidence of interruption persists until someone deliberately clears the interrupted status.

Calling interrupt does not necessarily stop the target thread from doing what it is doing; it merely delivers the message that interruption has been requested.

A good way to think about interruption is that it does not actually interrupt a running thread; it just requests that the thread interrupt itself at the next convenient opportunity. (These opportunities are called cancellation points.) Some methods, such as wait, sleep, and join, take such requests seriously, throwing an exception when they receive an interrupt request or encounter an already set interrupt status upon entry. Well behaved methods may totally ignore such requests so long as they leave the interruption request in place so that calling code can do something with it. Poorly behaved methods swallow the interrupt request, thus denying code further up the call stack the opportunity to act on it.

The static interrupted method should be used with caution, because it clears the current thread's interrupted status. If you call interrupted and it returns TRue, unless you are planning to swallow the interruption, you should do something with iteither throw InterruptedException or restore the interrupted status by calling interrupt again, as in Listing 5.10 on page 94.

BrokenPrimeProducer illustrates how custom cancellation mechanisms do not always interact well with blocking library methods. If you code your tasks to be responsive to interruption, you can use interruption as your cancellation mechanism and take advantage of the interruption support provided by many library classes.

Interruption is usually the most sensible way to implement cancellation.

BrokenPrimeProducer can be easily fixed (and simplified) by using interruption instead of a boolean flag to request cancellation, as shown in Listing 7.5. There are two points in each loop iteration where interruption may be detected: in the blocking put call, and by explicitly polling the interrupted status in the loop header. The explicit test is not strictly necessary here because of the blocking put call, but it makes PrimeProducer more responsive to interruption because it checks for interruption before starting the lengthy task of searching for a prime, rather than after. When calls to interruptible blocking methods are not frequent enough to deliver the desired responsiveness, explicitly testing the interrupted status can help.

Listing 7.5. Using Interruption for Cancellation.

class PrimeProducer extends Thread { private final BlockingQueue queue; PrimeProducer(BlockingQueue queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; while (!Thread.currentThread().isInterrupted()) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { /* Allow thread to exit */ } } public void cancel() { interrupt(); } }

7.1.2. Interruption Policies

Just as tasks should have a cancellation policy, threads should have an interruption policy. An interruption policy determines how a thread interprets an interruption requestwhat it does (if anything) when one is detected, what units of work are considered atomic with respect to interruption, and how quickly it reacts to interruption.

The most sensible interruption policy is some form of thread-level or servicelevel cancellation: exit as quickly as practical, cleaning up if necessary, and possibly notifying some owning entity that the thread is exiting. It is possible to establish other interruption policies, such as pausing or resuming a service, but threads or thread pools with nonstandard interruption policies may need to be restricted to tasks that have been written with an awareness of the policy.

It is important to distinguish between how tasks and threads should react to interruption. A single interrupt request may havemore than one desired recipientinterrupting a worker thread in a thread pool can mean both "cancel the current task" and "shut down the worker thread".

Tasks do not execute in threads they own; they borrow threads owned by a service such as a thread pool. Code that doesn't own the thread (for a thread pool, any code outside of the thread pool implementation) should be careful to preserve the interrupted status so that the owning code can eventually act on it, even if the "guest" code acts on the interruption as well. (If you are house-sitting for someone, you don't throw out the mail that comes while they're awayyou save it and let them deal with it when they get back, even if you do read their magazines.)

This is why most blocking library methods simply throw InterruptedException in response to an interrupt. They will never execute in a thread they own, so they implement the most reasonable cancellation policy for task or library code: get out of the way as quickly as possible and communicate the interruption back to the caller so that code higher up on the call stack can take further action.

A task needn't necessarily drop everything when it detects an interruption requestit can choose to postpone it until a more opportune time by remembering that it was interrupted, finishing the task it was performing, and then throwing InterruptedException or otherwise indicating interruption. This technique can protect data structures from corruption when an activity is interrupted in the middle of an update.

A task should not assume anything about the interruption policy of its executing thread unless it is explicitly designed to run within a service that has a specific interruption policy. Whether a task interprets interruption as cancellation or takes some other action on interruption, it should take care to preserve the executing thread's interruption status. If it is not simply going to propagate InterruptedException to its caller, it should restore the interruption status after catching InterruptedException:

Thread.currentThread().interrupt();

Just as task code should not make assumptions about what interruption means to its executing thread, cancellation code should not make assumptions about the interruption policy of arbitrary threads. A thread should be interrupted only by its owner; the owner can encapsulate knowledge of the thread's interruption policy in an appropriate cancellation mechanism such as a shutdown method.

Because each thread has its own interruption policy, you should not interrupt a thread unless you know what interruption means to that thread.

Critics have derided the Java interruption facility because it does not provide a preemptive interruption capability and yet forces developers to handle InterruptedException. However, the ability to postpone an interruption request enables developers to craft flexible interruption policies that balance responsiveness and robustness as appropriate for the application.

7.1.3. Responding to Interruption

As mentioned in Section 5.4, when you call an interruptible blocking method such as Thread.sleep or BlockingQueue.put, there are two practical strategies for handling InterruptedException:

Propagating InterruptedException can be as easy as adding InterruptedException to the throws clause, as shown by getNextTask in Listing 7.6.

Listing 7.6. Propagating InterruptedException to Callers.

BlockingQueue queue; ... public Task getNextTask() throws InterruptedException { return queue.take(); }

If you don't want to or cannot propagate InterruptedException (perhaps because your task is defined by a Runnable), you need to find another way to preserve the interruption request. The standard way to do this is to restore the interrupted status by calling interrupt again. What you should not do is swallow the InterruptedException by catching it and doing nothing in the catch block, unless your code is actually implementing the interruption policy for a thread. PrimeProducer swallows the interrupt, but does so with the knowledge that the thread is about to terminate and that therefore there is no code higher up on the call stack that needs to know about the interruption. Most code does not know what thread it will run in and so should preserve the interrupted status.

Only code that implements a thread's interruption policy may swallow an interruption request. General-purpose task and library code should never swallow interruption requests.

Activities that do not support cancellation but still call interruptible blocking methods will have to call them in a loop, retrying when interruption is detected. In this case, they should save the interruption status locally and restore it just before returning, as shown in Listing 7.7, rather than immediately upon catching InterruptedException. Setting the interrupted status too early could result in an infinite loop, because most interruptible blocking methods check the interrupted status on entry and throw InterruptedException immediately if it is set. (Interruptible methods usually poll for interruption before blocking or doing any significant work, so as to be as responsive to interruption as possible.)

If your code does not call interruptible blocking methods, it can still be made responsive to interruption by polling the current thread's interrupted status throughout the task code. Choosing a polling frequency is a tradeoff between efficiency and responsiveness. If you have high responsiveness requirements, you cannot call potentially long-running methods that are not themselves responsive to interruption, potentially restricting your options for calling library code.

Cancellation can involve state other than the interruption status; interruption can be used to get the thread's attention, and information stored elsewhere by the interrupting thread can be used to provide further instructions for the interrupted thread. (Be sure to use synchronization when accessing that information.)

Listing 7.7. Noncancelable Task that Restores Interruption Before Exit.

public Task getNextTask(BlockingQueue// fall through and retry } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }

For example, when a worker thread owned by a THReadPoolExecutor detects interruption, it checks whether the pool is being shut down. If so, it performs some pool cleanup before terminating; otherwise it may create a new thread to restore the thread pool to the desired size.

7.1.4. Example: Timed Run

Many problems can take forever to solve (e.g., enumerate all the prime numbers); for others, the answer might be found reasonably quickly but also might take forever. Being able to say "spend up to ten minutes looking for the answer" or "enumerate all the answers you can in ten minutes" can be useful in these situations.

The aSecondOfPrimes method in Listing 7.2 starts a PrimeGenerator and interrupts it after a second. While the PrimeGenerator might take somewhat longer than a second to stop, it will eventually notice the interrupt and stop, allowing the thread to terminate. But another aspect of executing a task is that you want to find out if the task throws an exception. If PrimeGenerator throws an unchecked exception before the timeout expires, it will probably go unnoticed, since the prime generator runs in a separate thread that does not explicitly handle exceptions.

Listing 7.8 shows an attempt at running an arbitrary Runnable for a given amount of time. It runs the task in the calling thread and schedules a cancellation task to interrupt it after a given time interval. This addresses the problem of unchecked exceptions thrown from the task, since they can then be caught by the caller of timedRun.

This is an appealingly simple approach, but it violates the rules: you should know a thread's interruption policy before interrupting it. Since timedRun can be called from an arbitrary thread, it cannot know the calling thread's interruption policy. If the task completes before the timeout, the cancellation task that interrupts the thread in which timedRun was called could go off after timedRun has returned to its caller. We don't know what code will be running when that happens, but the result won't be good. (It is possible but surprisingly tricky to eliminate this risk by using the ScheduledFuture returned by schedule to cancel the cancellation task.)

Listing 7.8. Scheduling an Interrupt on a Borrowed Thread. Don't Do this.

private static final ScheduledExecutorService cancelExec = ...; public static void timedRun(Runnable r, long timeout, TimeUnit unit) { final Thread taskThread = Thread.currentThread(); cancelExec.schedule(new Runnable() { public void run() { taskThread.interrupt(); } }, timeout, unit); r.run(); }

Further, if the task is not responsive to interruption, timedRun will not return until the task finishes, which may be long after the desired timeout (or even not at all). A timed run service that doesn't return after the specified time is likely to be irritating to its callers.

Listing 7.9 addresses the exception-handling problem of aSecondOfPrimes and the problems with the previous attempt. The thread created to run the task can have its own execution policy, and even if the task doesn't respond to the interrupt, the timed run method can still return to its caller. After starting the task thread, timedRun executes a timed join with the newly created thread. After join returns, it checks if an exception was thrown from the task and if so, rethrows it in the thread calling timedRun. The saved Throwable is shared between the two threads, and so is declared volatile to safely publish it from the task thread to the timedRun thread.

This version addresses the problems in the previous examples, but because it relies on a timed join, it shares a deficiency with join: we don't know if control was returned because the thread exited normally or because the join timed out.[2]

[2] This is a flaw in the Thread API, because whether or not the join completes successfully has memory visibility consequences in the Java Memory Model, but join does not return a status indicating whether it was successful.

7.1.5. Cancellation Via Future

We've already used an abstraction for managing the lifecycle of a task, dealing with exceptions, and facilitating cancellationFuture. Following the general principle that it is better to use existing library classes than to roll your own, let's build timedRun using Future and the task execution framework.

Listing 7.9. Interrupting a Task in a Dedicated Thread.

public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException { class RethrowableTask implements Runnable { private volatile Throwable t; public void run() { try { r.run(); } catch (Throwable t) { this.t = t; } } void rethrow() { if (t != null) throw launderThrowable(t); } } RethrowableTask task = new RethrowableTask(); final Thread taskThread = new Thread(task); taskThread.start(); cancelExec.schedule(new Runnable() { public void run() { taskThread.interrupt(); } }, timeout, unit); taskThread.join(unit.toMillis(timeout)); task.rethrow(); }

ExecutorService.submit returns a Future describing the task. Future has a cancel method that takes a boolean argument, mayInterruptIfRunning, and returns a value indicating whether the cancellation attempt was successful. (This tells you only whether it was able to deliver the interruption, not whether the task detected and acted on it.) When mayInterruptIfRunning is true and the task is currently running in some thread, then that thread is interrupted. Setting this argument to false means "don't run this task if it hasn't started yet", and should be used for tasks that are not designed to handle interruption.

Since you shouldn't interrupt a thread unless you know its interruption policy, when is it OK to call cancel with an argument of TRue? The task execution threads created by the standard Executor implementations implement an interruption policy that lets tasks be cancelled using interruption, so it is safe to set mayInterruptIfRunning when cancelling tasks through their Futures when they are running in a standard Executor. You should not interrupt a pool thread directly when attempting to cancel a task, because you won't know what task is running when the interrupt request is delivereddo this only through the task's Future. This is yet another reason to code tasks to treat interruption as a cancellation request: then they can be cancelled through their Futures.

Listing 7.10 shows a version of timedRun that submits the task to an ExecutorService and retrieves the result with a timed Future.get. If get terminates with a TimeoutException, the task is cancelled via its Future. (To simplify coding, this version calls Future.cancel unconditionally in a finally block, taking advantage of the fact that cancelling a completed task has no effect.) If the underlying computation throws an exception prior to cancellation, it is rethrown from timedRun, which is the most convenient way for the caller to deal with the exception. Listing 7.10 also illustrates another good practice: cancelling tasks whose result is no longer needed. (This technique was also used in Listing 6.13 on page 128 and Listing 6.16 on page 132.)

Listing 7.10. Cancelling a Task Using Future.

public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future task = taskExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // task will be cancelled below } catch (ExecutionException e) { // exception thrown in task; rethrow throw launderThrowable(e.getCause()); } finally { // Harmless if task already completed task.cancel(true); // interrupt if running } }

When Future.get throws InterruptedException or TimeoutException and you know that the result is no longer needed by the program, cancel the task with Future.cancel.

 

7.1.6. Dealing with Non-interruptible Blocking

Many blocking library methods respond to interruption by returning early and throwing InterruptedException, which makes it easier to build tasks that are responsive to cancellation. However, not all blocking methods or blocking mechanisms are responsive to interruption; if a thread is blocked performing synchronous socket I/O or waiting to acquire an intrinsic lock, interruption has no effect other than setting the thread's interrupted status. We can sometimes convince threads blocked in noninterruptible activities to stop by means similar to interruption, but this requires greater awareness of why the thread is blocked.

Synchronous socket I/O in java.io. The common form of blocking I/O in server applications is reading or writing to a socket. Unfortunately, the read and write methods in InputStream and OutputStream are not responsive to interruption, but closing the underlying socket makes any threads blocked in read or write throw a SocketException.

Synchronous I/O in java.nio. Interrupting a thread waiting on an InterruptibleChannel causes it to throw ClosedByInterruptException and close the channel (and also causes all other threads blocked on the channel to throw ClosedByInterruptException). Closing an InterruptibleChannel causes threads blocked on channel operations to throw AsynchronousCloseException. Most standard Channels implement InterruptibleChannel.

Asynchronous I/O with Selector. If a thread is blocked in Selector.select (in java.nio.channels), wakeup causes it to return prematurely by throwing a ClosedSelectorException.

Lock acquisition. If a thread is blocked waiting for an intrinsic lock, there is nothing you can do to stop it short of ensuring that it eventually acquires the lock and makes enough progress that you can get its attention some other way. However, the explicit Lock classes offer the lockInterruptibly method, which allows you to wait for a lock and still be responsive to interruptssee Chapter 13.

ReaderThread in Listing 7.11 shows a technique for encapsulating nonstandard cancellation. ReaderThread manages a single socket connection, reading synchronously from the socket and passing any data received to processBuffer. To facilitate terminating a user connection or shutting down the server, ReaderThread overrides interrupt to both deliver a standard interrupt and close the underlying socket; thus interrupting a ReaderThread makes it stop what it is doing whether it is blocked in read or in an interruptible blocking method.

7.1.7. Encapsulating Nonstandard Cancellation with Newtaskfor

The technique used in ReaderThread to encapsulate nonstandard cancellation can be refined using the newTaskFor hook added to ThreadPoolExecutor in Java 6. When a Callable is submitted to an ExecutorService, submit returns a Future that can be used to cancel the task. The newTaskFor hook is a factory method that creates the Future representing the task. It returns a RunnableFuture, an interface that extends both Future and Runnable (and is implemented by FutureTask).

Customizing the task Future allows you to override Future.cancel. Custom cancellation code can perform logging or gather statistics on cancellation, and can also be used to cancel activities that are not responsive to interruption. ReaderThread encapsulates cancellation of socket-using threads by overriding interrupt; the same can be done for tasks by overriding Future.cancel.

CancellableTask in Listing 7.12 defines a CancellableTask interface that extends Callable and adds a cancel method and a newTask factory method for constructing a RunnableFuture. CancellingExecutor extends THReadPoolExecutor, and overrides newTaskFor to let a CancellableTask create its own Future.

Listing 7.11. Encapsulating Nonstandard Cancellation in a THRead by Overriding Interrupt.

public class ReaderThread extends Thread { private final Socket socket; private final InputStream in; public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); } public void interrupt() { try { socket.close(); } catch (IOException ignored) { } finally { super.interrupt(); } } public void run() { try { byte[] buf = new byte[BUFSZ]; while (true) { int count = in.read(buf); if (count < 0) break; else if (count > 0) processBuffer(buf, count); } } catch (IOException e) { /* Allow thread to exit */ } } }

SocketUsingTask implements CancellableTask and defines Future.cancel to close the socket as well as call super.cancel. If a SocketUsingTask is cancelled through its Future, the socket is closed and the executing thread is interrupted. This increases the task's responsiveness to cancellation: not only can it safely call interruptible blocking methods while remaining responsive to cancellation, but it can also call blocking socket I/O methods.

Stopping a Thread based Service

Категории