Producer/Consumer Relationship with Thread Synchronization
Producer Consumer Relationship with Thread Synchronization
Figures 15.9 and 15.10 demonstrate a producer and a consumer accessing a shared cell of memory with synchronization, so that the consumer consumes only after the producer produces a value and the producer produces a new value only after the consumer consumes the previous value produced. This examples reuses interface Buffer (Fig. 15.4) and classes Producer (Fig. 15.5) and Consumer (Fig. 15.6) from the previous example. [Note: In this example, we demonstrate synchronization with class Monitor's Enter and Exit methods. In the next example, we demonstrate the same concepts via a lock block.]
Figure 15.9. SynchronizedBuffer synchronizes access to a shared integer.
1 // Fig. 15.9: SynchronizedBuffer.cs 2 // A synchronized shared buffer implementation. 3 using System; 4 using System.Threading; 5 6 // this class represents a single shared int 7 public class SynchronizedBuffer : Buffer 8 { 9 // buffer shared by producer and consumer threads 10 private int buffer = -1; 11 12 // occupiedBufferCount maintains count of occupied buffers 13 private int occupiedBufferCount = 0; 14 15 // property Buffer 16 public int Buffer 17 { 18 get 19 { 20 // obtain lock on this object 21 Monitor.Enter( this ); 22 23 // if there is no data to read, place invoking 24 // thread in WaitSleepJoin state 25 if ( occupiedBufferCount == 0 ) 26 { 27 Console.WriteLine( 28 Thread.CurrentThread.Name + " tries to read." ); 29 DisplayState( "Buffer empty. " + 30 Thread.CurrentThread.Name + " waits." ); 31 Monitor.Wait( this ); // enter WaitSleepJoin state 32 } // end if 33 34 // indicate that producer can store another value 35 // because consumer is about to retrieve a buffer value 36 --occupiedBufferCount; 37 38 DisplayState( Thread.CurrentThread.Name + " reads " + buffer ); 39 40 // tell waiting thread (if there is one) to 41 // become ready to execute (Running state) 42 Monitor.Pulse( this ); 43 44 // Get copy of buffer before releasing lock. 45 // It is possible that the producer could be 46 // assigned the processor immediately after the 47 // monitor is released and before the return 48 // statement executes. In this case, the producer 49 // would assign a new value to buffer before the 50 // return statement returns the value to the 51 // consumer. Thus, the consumer would receive the 52 // new value. Making a copy of buffer and 53 // returning the copy ensures that the 54 // consumer receives the proper value. 55 int bufferCopy = buffer; 56 57 // release lock on this object 58 Monitor.Exit( this ); 59 60 return bufferCopy; 61 } // end get 62 set 63 { 64 // acquire lock for this object 65 Monitor.Enter( this ); 66 67 // if there are no empty locations, place invoking 68 // thread in WaitSleepJoin state 69 if ( occupiedBufferCount == 1 ) 70 { 71 Console.WriteLine( 72 Thread.CurrentThread.Name + " tries to write." ); 73 DisplayState( "Buffer full. " + 74 Thread.CurrentThread.Name + " waits." ); 75 Monitor.Wait( this ); // enter WaitSleepJoin state 76 } // end if 77 78 // set new buffer value 79 buffer = value; 80 81 // indicate consumer can retrieve another value 82 // because producer has just stored a buffer value 83 ++occupiedBufferCount; 84 85 DisplayState( Thread.CurrentThread.Name + " writes " + buffer ); 86 87 // tell waiting thread (if there is one) to 88 // become ready to execute (Running state) 89 Monitor.Pulse( this ); 90 91 // release lock on this object 92 Monitor.Exit( this ); 93 } // end set 94 } // end property Buffer 95 96 // display current operation and buffer state 97 public void DisplayState( string operation ) 98 { 99 Console.WriteLine( "{0,-35}{1,-9}{2} ", 100 operation, buffer, occupiedBufferCount ); 101 } // end method DisplayState 102 } // end class SynchronizedBuffer |
Figure 15.10. Producer and consumer threads accessing a shared object with synchronization.
(This item is displayed on pages 741 - 744 in the print version)
1 // Fig. 15.10: SynchronizedBufferTest.cs 2 // Showing multiple threads modifying a shared object with 3 // synchronization. 4 using System; 5 using System.Threading; 6 7 // this class creates producer and consumer threads 8 class SynchronizedBufferTest 9 { 10 // create producer and consumer threads and start them 11 static void Main( string[] args ) 12 { 13 // create shared object used by threads 14 SynchronizedBuffer shared = new SynchronizedBuffer(); 15 16 // Random object used by each thread 17 Random random = new Random(); 18 19 // output column heads and initial buffer state 20 Console.WriteLine( "{0,-35}{1,-9}{2} ", 21 "Operation", "Buffer", "Occupied Count" ); 22 shared.DisplayState( "Initial state" ); 23 24 // create Producer and Consumer objects 25 Producer producer = new Producer( shared, random ); 26 Consumer consumer = new Consumer( shared, random ); 27 28 // create threads for producer and consumer and set 29 // delegates for each thread 30 Thread producerThread = 31 new Thread( new ThreadStart( producer.Produce ) ); 32 producerThread.Name = "Producer"; 33 34 Thread consumerThread = 35 new Thread( new ThreadStart( consumer.Consume ) ); 36 consumerThread.Name = "Consumer"; 37 38 // start each thread 39 producerThread.Start(); 40 consumerThread.Start(); 41 } // end Main 42 } // end class SynchronizedBufferTest
|
Class SynchronizedBuffer (Figure 15.9) implements interface Buffer and contains two instance variablesbuffer (line 10) and occupiedBufferCount (line 13). Also, property Buffer's get (lines 1861) and set (lines 6293) accessors now use methods of class Monitor to synchronize access to instance variable buffer. Thus, each object of class SynchronizedBuffer has a SyncBlock to maintain synchronization. Instance variable occupiedBufferCount is known as a condition variableproperty Buffer's accessors use this int in conditions to determine whether it is the producer's or the consumer's turn to perform a task. If occupiedBufferCount is 0, property Buffer's set accessor can place a value into variable buffer, because the variable currently does not contain informationbut this means that property Buffer's get accessor cannot read the value of buffer. If occupiedBufferCount is 1, the Buffer property's get accessor can read a value from variable buffer, because the variable does contain information, but property Buffer's set accessor cannot place a value into buffer.
As in the previous example, the producer thread performs the tasks specified in the producer object's Produce method. When line 28 of Fig. 15.5 sets the value of sharedLocation's property Buffer, the producer thread invokes the set accessor at lines 6293 of Fig. 15.9. Line 65 invokes Monitor method Enter with the argument this to acquire the lock on the SynchronizedBuffer object. The if statement (lines 6976) determines whether occupiedBufferCount is 1. If this condition is TRue, lines 7172 output a message indicating that the producer thread is trying to write a value, and lines 7374 invoke method DisplayState (lines 97101) to output another message indicating that the buffer is full and that the producer thread waits. Line 75 invokes Monitor method Wait with the argument this to place the calling thread (i.e., the producer) in the WaitSleepJoin state for the SynchronizedBuffer object. This also releases the lock on the SynchronizedBuffer object. The WaitSleepJoin state for an object is maintained by that object's SyncBlock. Now another thread can invoke an accessor method of the SynchronizedBuffer object's Buffer property.
The producer thread remains in the WaitSleepJoin state until the thread is notified by the consumer's call to Monitor method Pulse that it may proceedat which point the thread returns to the Running state and waits for the system to assign a processor to the thread. When the thread returns to the Running state, the thread implicitly reacquires the lock on the SynchronizedBuffer object, and the set accessor continues executing with the next statement after Wait. Line 79 assigns value to buffer. Line 83 increments the occupiedBufferCount to indicate that the shared buffer now contains a value (i.e., a consumer can read the value, and a producer cannot yet put another value there). Line 85 invokes method DisplayState to output a line to the console window indicating that the producer is writing a new value into the buffer. Line 89 invokes Monitor method Pulse with the SynchronizedBuffer object (this) as an argument. If there are any waiting threads in that object's SyncBlock, the first waiting thread enters the Running state, indicating that the thread can now attempt its task again (as soon as the thread is assigned a processor). The Pulse method returns immediately. Line 92 invokes Monitor method Exit to release the lock on the SynchronizedBuffer object, and the set accessor returns to its caller (i.e., the Produce method of the Producer).
The get and set accessors are implemented similarly. As in the previous example, the consumer thread performs the tasks specified in the consumer object's Consume method. The consumer thread gets the value of the SynchronizedBuffer object's Buffer property (line 30 of Fig. 15.6) by invoking the get accessor at lines 1861 of Fig. 15.9. Line 21 invokes Monitor method Enter to acquire the lock on the SynchronizedBuffer object.
The if statement at lines 2532 determines whether occupiedBufferCount is 0. If this condition is TRue, lines 2728 output a message indicating that the consumer thread is trying to read a value, and lines 2930 invoke method DisplayState to output another message indicating that the buffer is empty and that the consumer thread waits. Line 31 invokes Monitor method Wait with the argument this to place the calling thread (i.e., the consumer) in the WaitSleepJoin state for the SynchronizedBuffer object and releases the lock on the object. Now another thread can invoke an accessor method of the SynchronizedBuffer object's Buffer property.
The consumer thread object remains in the WaitSleepJoin state until the thread is notified by the producer's call to Monitor method Pulse that it may proceedat which point the thread returns to the Running state and waits for the system to assign a processor to the thread. When the thread re-enters the Running state, the thread implicitly reacquires the lock on the SynchronizedBuffer object, and the get accessor continues executing with the next statement after Wait. Line 36 decrements occupiedBufferCount to indicate that the shared buffer is now empty (i.e., a consumer cannot read the value, but a producer can place another value in the shared buffer), line 38 outputs a line to the console window indicating the value the consumer is reading and line 42 invokes Monitor method Pulse with the SynchronizedBuffer object as an argument. If there are any waiting threads in that object's SyncBlock, the first waiting thread enters the Running state, indicating that the thread can now attempt its task again (as soon as the thread is assigned a processor). The Pulse method returns immediately. Line 55 gets a copy of buffer before releasing the lock. This is necessary because it is possible that the producer could be assigned the processor immediately after the lock is released (line 58) and before the return statement executes (line 60). In this case, the producer would assign a new value to buffer before the return statement returns the value to the consumer and the consumer receives the new value. Making a copy of buffer and returning the copy ensures that the consumer receives the proper value. Line 58 invokes Monitor method Exit to release the lock on the SynchronizedBuffer object, and the get accessor returns bufferCopy to its caller.
Class SynchronizedBufferTest (Fig. 15.10) is nearly identical to class UnsynchronizedBufferTest (Fig. 15.8). SynchronizedBufferTest's Main method declares shared as an object of class SynchronizedBuffer (line 14) and also displays header information for the output (lines 2022).
Study the two sample outputs in Fig. 15.10. Observe that every integer produced is consumed exactly onceno values are lost and no values are consumed more than once. This occurs because the producer and consumer cannot perform tasks unless it is "their turn." The producer must go first; the consumer must wait if the producer has not produced since the consumer last consumed; and the producer must wait if the consumer has not yet consumed the value the producer most recently produced. Execute this program several times to confirm that every integer produced is consumed once. Notice the lines indicating when the producer and consumer must wait to perform their respective tasks.