The Executor Framework

Tasks are logical units of work, and threads are a mechanism by which tasks can run asynchronously. We've examined two policies for executing tasks using threadsexecute tasks sequentially in a single thread, and execute each task in its own thread. Both have serious limitations: the sequential approach suffers from poor responsiveness and throughput, and the thread-per-task approach suffers from poor resource management.

In Chapter 5, we saw how to use bounded queues to prevent an overloaded application from running out of memory. Thread pools offer the same benefit for thread management, and java.util.concurrent provides a flexible thread pool implementation as part of the Executor framework. The primary abstraction for task execution in the Java class libraries is not Thread, but Executor, shown in Listing 6.3.

Listing 6.3. Executor Interface.

public interface Executor { void execute(Runnable command); }

Executor may be a simple interface, but it forms the basis for a flexible and powerful framework for asynchronous task execution that supports a wide variety of task execution policies. It provides a standard means of decoupling task submission from task execution, describing tasks with Runnable. The Executor implementations also provide lifecycle support and hooks for adding statistics gathering, application management, and monitoring.

Executor is based on the producer-consumer pattern, where activities that submit tasks are the producers (producing units of work to be done) and the threads that execute tasks are the consumers (consuming those units of work). Using an Executor is usually the easiest path to implementing a producer-consumer design in your application.

6.2.1. Example: Web Server Using Executor

Building a web server with an Executor is easy. TaskExecutionWebServer in Listing 6.4 replaces the hard-coded thread creation with an Executor. In this case, we use one of the standard Executor implementations, a fixed-size thread pool with 100 threads.

In TaskExecutionWebServer, submission of the request-handling task is decoupled from its execution using an Executor, and its behavior can be changed merely by substituting a different Executor implementation. Changing Executor implementations or configuration is far less invasive than changing the way tasks are submitted; Executor configuration is generally a one-time event and can easily be exposed for deployment-time configuration, whereas task submission code tends to be strewn throughout the program and harder to expose.

Listing 6.4. Web Server Using a Thread Pool.

class TaskExecutionWebServer { private static final int NTHREADS = 100; private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(connection); } }; exec.execute(task); } } }

We can easily modify TaskExecutionWebServer to behave like ThreadPer-TaskWebServer by substituting an Executor that creates a new thread for each request. Writing such an Executor is trivial, as shown in ThreadPerTaskExecutor in Listing 6.5.

Listing 6.5. Executor that Starts a New Thread for Each Task.

public class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); }; }

Similarly, it is also easy to write an Executor that would make TaskExecutionWebServer behave like the single-threaded version, executing each task synchronously before returning from execute, as shown in WithinThreadExecutor in Listing 6.6.

6.2.2. Execution Policies

The value of decoupling submission from execution is that it lets you easily specify, and subsequently change without great difficulty, the execution policy for a given class of tasks. An execution policy specifies the "what, where, when, and how" of task execution, including:

Listing 6.6. Executor that Executes Tasks Synchronously in the Calling Thread.

public class WithinThreadExecutor implements Executor { public void execute(Runnable r) { r.run(); }; }

Execution policies are a resource management tool, and the optimal policy depends on the available computing resources and your quality-of-service requirements. By limiting the number of concurrent tasks, you can ensure that the application does not fail due to resource exhaustion or suffer performance problems due to contention for scarce resources.[3] Separating the specification of execution policy from task submission makes it practical to select an execution policy at deployment time that is matched to the available hardware.

[3] This is analogous to one of the roles of a transaction monitor in an enterprise application: it can throttle the rate at which transactions are allowed to proceed so as not to exhaust or overstress limited resources.

Whenever you see code of the form:

new Thread(runnable).start()  

and you think you might at some point want a more flexible execution policy, seriously consider replacing it with the use of an Executor.

 

6.2.3. Thread Pools

A thread pool, as its name suggests, manages a homogeneous pool of worker threads. A thread pool is tightly bound to a work queue holding tasks waiting to be executed. Worker threads have a simple life: request the next task from the work queue, execute it, and go back to waiting for another task.

Executing tasks in pool threads has a number of advantages over the thread-per-task approach. Reusing an existing thread instead of creating a new one amortizes thread creation and teardown costs over multiple requests. As an added bonus, since the worker thread often already exists at the time the request arrives, the latency associated with thread creation does not delay task execution, thus improving responsiveness. By properly tuning the size of the thread pool, you can have enough threads to keep the processors busy while not having so many that your application runs out of memory or thrashes due to competition among threads for resources.

The class library provides a flexible thread pool implementation along with some useful predefined configurations. You can create a thread pool by calling one of the static factory methods in Executors:

newFixedThreadPool. A fixed-size thread pool creates threads as tasks are submitted, up to the maximum pool size, and then attempts to keep the pool size constant (adding new threads if a thread dies due to an unexpected Exception).

newCachedThreadPool. A cached thread pool has more flexibility to reap idle threads when the current size of the pool exceeds the demand for processing, and to add new threads when demand increases, but places no bounds on the size of the pool.

newSingleThreadExecutor. A single-threaded executor creates a single worker thread to process tasks, replacing it if it dies unexpectedly. Tasks are guaranteed to be processed sequentially according to the order imposed by the task queue (FIFO, LIFO, priority order).[4]

[4] Single-threaded executors also provide sufficient internal synchronization to guarantee that any memory writes made by tasks are visible to subsequent tasks; this means that objects can be safely confined to the "task thread" even though that thread may be replaced with another from time to time.

newScheduledThreadPool. A fixed-size thread pool that supports delayed and periodic task execution, similar to Timer. (See Section 6.2.5.)

The newFixedThreadPool and newCachedThreadPool factories return instances of the general-purpose ThreadPoolExecutor, which can also be used directly to construct more specialized executors. We discuss thread pool configuration options in depth in Chapter 8.

The web server in TaskExecutionWebServer uses an Executor with a bounded pool of worker threads. Submitting a task with execute adds the task to the work queue, and the worker threads repeatedly dequeue tasks from the work queue and execute them.

Switching from a thread-per-task policy to a pool-based policy has a big effect on application stability: the web server will no longer fail under heavy load.[5] It also degrades more gracefully, since it does not create thousands of threads that compete for limited CPU and memory resources. And using an Executor opens the door to all sorts of additional opportunities for tuning, management, monitoring, logging, error reporting, and other possibilities that would have been far more difficult to add without a task execution framework.

[5] While the server may not fail due to the creation of too many threads, if the task arrival rate exceeds the task service rate for long enough it is still possible (just harder) to run out of memory because of the growing queue of Runnables awaiting execution. This can be addressed within the Executor framework by using a bounded work queuesee Section 8.3.2.

6.2.4. Executor Lifecycle

We've seen how to create an Executor but not how to shut one down. An Executor implementation is likely to create threads for processing tasks. But the JVM can't exit until all the (nondaemon) threads have terminated, so failing to shut down an Executor could prevent the JVM from exiting.

Because an Executor processes tasks asynchronously, at any given time the state of previously submitted tasks is not immediately obvious. Some may have completed, some may be currently running, and others may be queued awaiting execution. In shutting down an application, there is a spectrum from graceful shutdown (finish what you've started but don't accept any new work) to abrupt shutdown (turn off the power to the machine room), and various points in between. Since Executors provide a service to applications, they should be able to be shut down as well, both gracefully and abruptly, and feed back information to the application about the status of tasks that were affected by the shutdown.

To address the issue of execution service lifecycle, the ExecutorService interface extends Executor, adding a number of methods for lifecycle management (as well as some convenience methods for task submission). The lifecycle management methods of ExecutorService are shown in Listing 6.7.

Listing 6.7. Lifecycle Methods in ExecutorService.

public interface ExecutorService extends Executor { void shutdown(); List shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // ... additional convenience methods for task submission }

The lifecycle implied by ExecutorService has three statesrunning, shutting down, and terminated. ExecutorServices are initially created in the running state. The shutdown method initiates a graceful shutdown: no new tasks are accepted but previously submitted tasks are allowed to completeincluding those that have not yet begun execution. The shutdownNow method initiates an abrupt shutdown: it attempts to cancel outstanding tasks and does not start any tasks that are queued but not begun.

Tasks submitted to an ExecutorService after it has been shut down are handled by the rejected execution handler (see Section 8.3.3), which might silently discard the task or might cause execute to throw the unchecked RejectedExecutionException. Once all tasks have completed, the ExecutorService TRansitions to the terminated state. You can wait for an ExecutorService to reach the terminated state with awaitTermination, or poll for whether it has yet terminated with isTerminated. It is common to follow shutdown immediately by awaitTermination, creating the effect of synchronously shutting down the ExecutorService.(Executor shutdown and task cancellation are covered in more detail in Chapter 7.)

LifecycleWebServer in Listing 6.8 extends our web server with lifecycle support. It can be shut down in two ways: programmatically by calling stop, and through a client request by sending the web server a specially formatted HTTP request.

Listing 6.8. Web Server with Shutdown Support.

class LifecycleWebServer { private final ExecutorService exec = ...; public void start() throws IOException { ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) log("task submission rejected", e); } } } public void stop() { exec.shutdown(); } void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); else dispatchRequest(req); } }

6.2.5. Delayed and Periodic Tasks

The Timer facility manages the execution of deferred ("run this task in 100 ms") and periodic ("run this task every 10 ms") tasks. However, Timer has some drawbacks, and ScheduledThreadPoolExecutor should be thought of as its replacement.[6] You can construct a ScheduledThreadPoolExecutor through its constructor or through the newScheduledThreadPool factory.

[6] Timer does have support for scheduling based on absolute, not relative time, so that tasks can be sensitive to changes in the system clock; ScheduledThreadPoolExecutor supports only relative time.

A Timer creates only a single thread for executing timer tasks. If a timer task takes too long to run, the timing accuracy of other TimerTasks can suffer. If a recurring TimerTask is scheduled to run every 10 ms and another Timer-Task takes 40 ms to run, the recurring task either (depending on whether it was scheduled at fixed rate or fixed delay) gets called four times in rapid succession after the long-running task completes, or "misses" four invocations completely. Scheduled thread pools address this limitation by letting you provide multiple threads for executing deferred and periodic tasks.

Another problem with Timer is that it behaves poorly if a TimerTask throws an unchecked exception. The Timer thread doesn't catch the exception, so an unchecked exception thrown from a TimerTask terminates the timer thread. Timer also doesn't resurrect the thread in this situation; instead, it erroneously assumes the entire Timer was cancelled. In this case, TimerTasks that are already scheduled but not yet executed are never run, and new tasks cannot be scheduled. (This problem, called "thread leakage" is described in Section 7.3, along with techniques for avoiding it.)

OutOfTime in Listing 6.9 illustrates how a Timer can become confused in this manner and, as confusion loves company, how the Timer shares its confusion with the next hapless caller that tries to submit a TimerTask. You might expect the program to run for six seconds and exit, but what actually happens is that it terminates after one second with an IllegalStateException whose message text is "Timer already cancelled". ScheduledThreadPoolExecutor deals properly with ill-behaved tasks; there is little reason to use Timer in Java 5.0 or later.

If you need to build your own scheduling service, you may still be able to take advantage of the library by using a DelayQueue, a BlockingQueue implementation that provides the scheduling functionality of ScheduledThreadPoolExecutor. A DelayQueue manages a collection of Delayed objects. A Delayed has a delay time associated with it: DelayQueue lets you take an element only if its delay has expired. Objects are returned from a DelayQueue ordered by the time associated with their delay.

Finding Exploitable Parallelism

Категории