Notifying One Thread from Another

Problem

You are using a pattern where one thread (or group of threads) does something and it needs to let another thread (or group of threads) know about it. You may have a master thread that is handing out work to slave threads, or you may use one group of threads to populate a queue and another to remove the data from it and do something useful.

Solution

Use mutex and condition objects, declared in boost/thread/mutex.hpp and boost/thread/condition.hpp. You can create a condition for each situation you want threads to wait for, and notify any waiting threads on the condition. Example 12-4 shows how to use signaling in a master/slave threading model.

Example 12-4. Signaling between threads

#include #include #include #include #include #include class Request { /*...*/ }; // A simple job queue class; don't do this, use std::queue template class JobQueue { public: JobQueue( ) {} ~JobQueue( ) {} void submitJob(const T& x) { boost::mutex::scoped_lock lock(mutex_); list_.push_back(x); workToBeDone_.notify_one( ); } T getJob( ) { boost::mutex::scoped_lock lock(mutex_); workToBeDone_.wait(lock); // Wait until this condition is // satisfied, then lock the mutex T tmp = list_.front( ); list_.pop_front( ); return(tmp); } private: std::list list_; boost::mutex mutex_; boost::condition workToBeDone_; }; JobQueue myJobQueue; void boss( ) { for (;;) { // Get the request from somewhere Request req; myJobQueue.submitJob(req); } } void worker( ) { for (;;) { Request r(myJobQueue.getJob( )); // Do something with the job... } } int main( ) { boost::thread thr1(boss); boost::thread thr2(worker); boost::thread thr3(worker); thr1.join( ); thr2.join( ); thr3.join( ); }

 

Discussion

A condition object uses a mutex, and lets you wait for a situation other than its becoming unlocked. Consider Example 12-4, which is a modified version of the Queue class presented in Example 12-2. I have modified Queue to be a specific kind of queue, namely a JobQueue, where objects representing jobs are submitted by a master thread and are retrieved by slave threads.

The most important change for the JobQueue class is the condition member variable workToBeDone_. This is a condition that indicates whether or not there is work in the queue. When a thread wants to retrieve an element from the job queue, it calls getJob, which tries to acquire a lock on the mutex and then waits for the new condition with the following lines:

boost::mutex::scoped_lock lock(mutex_); workToBeDone_.wait(lock);

The first line locks the mutex in the usual manner. The second line then unlocks the mutex and waits, or goes to sleep, until the condition is met. The unlocking of the mutex allows other threads to use that mutexone of them might need it to set up the condition we are waiting forotherwise, other threads would be unable to lock the mutex while one thread was waiting on the condition.

In submitJob, after the job has been added to the internal list, I added the following line:

workToBeDone_.notify_one( );

This "satisfies" the condition that getJob is waiting for. Technically, this means that if there are any threads who have called wait on this condition, that one of them is put in a run state. In getJob, that means that execution continues at the following line:

workToBeDone_.wait(lock);

But not just yet. wait does two things: it waits until someone calls notify_one or notify_all on the condition that it's waiting on, then it tries to lock the mutex it's associated with. So what actually happens when submitJob calls notify_all is that the waiting thread is put in a run state and the next thing it does is try to lock the mutex that submitJob still has locked, so it goes back into a wait state until submitJob is complete. Thus, condition::wait requires that the mutex be locked when you call it, at which point it is unlocked, then locked again when the condition is met.

Notify all threads that are waiting for some condition to be true by calling notify_all. This works the same way as notify_one, except that all threads that are waiting on the condition are changed to a run state. They all try and acquire the next lock though, so what happens after that depends on the kind of mutex and the type of locks used.

A condition gives you something subtle that you don't get when you are using mutexes and locks alone. Consider the case of the Queue class presented earlier. Threads waiting to dequeue something wait until they can acquire a write lock, then pop the next item off the queue. This may appear to work fine without any sort of signaling mechanism, but does it really? What about when the queue is empty? You have a few choices for how you implement dequeue if you are waiting for a condition to become true: acquire the lock; check to see if there are items in the queue or not, if not, return; use another mutex that is locked when the queue is empty and unlocked when it has data (not a good idea); or return a special value when the queue is empty. These are either problematic or inefficient. If you simply return when the queue is empty by throwing an exception or returning a special value, then your clients have to keep checking to see when something arrives. This is a needless drain on resources.

A condition lets consumer threads sleep so the processor can do something else while a condition is not met. Imagine a web server that uses a pool of worker threads to handle incoming requests. It is far better to have child threads in a wait state when there is no activity then to have them looping, or sleeping and waking up occasionally to check the queue.

Категории