Limiting Multithreading with a Thread Pool

Problem

You want to process multiple requests in parallel, but you don necessarily want to run all the requests simultaneously. Using a technique like that in Recipe 20.6 can create a huge number of threads running at once, slowing down the average response time. You want to set a limit on the number of simultaneously running threads.

Solution

You want a thread pool. If you e writing an Internet server and you want to service requests in parallel, you should build your code on top of the gserver module, as seen in Recipe 14.14: it has a thread pool and many TCP/IP-specific features. Otherwise, heres a generic THReadPool class, based on code from gserver.

The instance variable @pool contains the active threads. The Mutex and the ConditionVariable are used to control the addition of threads to the pool, so that the pool never contains more than @max_size tHReads:

require hread class ThreadPool def initialize(max_size) @pool = [] @max_size = max_size @pool_mutex = Mutex.new @pool_cv = ConditionVariable.new end

When a thread wants to enter the pool, but the pool is full, the thread puts itself to sleep by calling ConditionVariable#wait. When a thread in the pool finishes executing, it removes itself from the pool and calls ConditionVariable#signal to wake up the first sleeping thread:

def dispatch(*args) Thread.new do # Wait for space in the pool. @pool_mutex.synchronize do while @pool.size >= @max_size print "Pool is full; waiting to run #{args.join(,)}… " if $DEBUG # Sleep until some other thread calls @pool_cv.signal. @pool_cv.wait(@pool_mutex) end end

The newly-awakened thread adds itself to the pool, runs its code, and then calls ConditionVariable#signal to wake up the next sleeping thread:

@pool << Thread.current begin yield(*args) rescue => e exception(self, e, *args) ensure @pool_mutex.synchronize do # Remove the thread from the pool. @pool.delete(Thread.current) # Signal the next waiting thread that theres a space in the pool. @pool_cv.signal end end end end def shutdown @pool_mutex.synchronize { @pool_cv.wait(@pool_mutex) until @pool.empty? } end def exception(thread, exception, *original_args) # Subclass this method to handle an exception within a thread. puts "Exception in thread #{thread}: #{exception}" end end

Heres a simulation of five incoming jobs that take different times to run. The pool ensures no more than three jobs run at a time. The job code doesn need to know anything about threads or thread pools; thats all handled by THReadPool#dispatch.

$DEBUG = true pool = ThreadPool.new(3) 1.upto(5) do |i| pool.dispatch(i) do |i| print "Job #{i} started. " sleep(5-i) print "Job #{i} complete. " end end # Job 1 started. # Job 3 started. # Job 2 started. # Pool is full; waiting to run 4… # Pool is full; waiting to run 5… # Job 3 complete. # Job 4 started. # Job 2 complete. # Job 5 started. # Job 5 complete. # Job 4 complete. # Job 1 complete. pool.shutdown

Discussion

When should you use a thread pool, and when should you just send a swarm of threads after the problem? Consider why this pattern is so common in Internet servers that its built into Rubys gserver library. Internet server requests are usually I/O bound, because most servers operate on the filesystem or a database. If you run high latency requests in parallel (like requests for filesystem files), you can complete multiple requests in about the same time it would take to complete a single request.

But Internet server requests can use a lot of memory, and any random user on the Internet can trigger a job on your server. If you create and start a thread for every incoming request, its easy to run out of resources. You need to find a tradeoff between the performance benefit of multithreading and the performance hazard of thrashing due to insufficient resources. The simplest way to do this is to limit the number of requests that can be processed at a given time.

A thread pool isn a connection pool, like you might see with a database. Database connections are often pooled because they e expensive to create. Threads are pretty cheap; we just don want a lot of them actively running at once. The example in the Solution creates five threads at once, but only three of them can be active at any one time. The rest are asleep, waiting for a notification from the condition variable pool_cv.

Calling ThreadPool#dispatch with a code block creates a new thread that runs the code block, but not until it finds a free slot in the thread pool. Until then, its waiting on the condition variable @pool_cv. When one of the threads in the pool completes its code block, it calls signal on the condition variable, waking up the first thread currently waiting on it.

The shutdown method makes sure all the jobs complete by repeatedly waiting on the condition variable until no other threads want access to the pool.

See Also

Категории