Producer/Consumer Relationship without Thread Synchronization

Producer Consumer Relationship without Thread Synchronization

In a producer/consumer relationship, the producer portion of an application generates data and the consumer portion of an application uses that data. In a multithreaded producer/consumer relationship, a producer thread calls a produce method to generate data and place it in a shared region of memory, called a buffer. A consumer thread calls a consume method to read that data. If the producer wishes to put the next data in the buffer but determines that the consumer has not yet read the previous data from the buffer, the producer thread should call Wait. Otherwise, the consumer would never see the previous data, which would be lost to that application. When the consumer thread reads the data, it should call Pulse to allow a waiting producer to proceed, since there is now free space in the buffer. If a consumer thread finds the buffer empty or finds that the previous data has already been read, the consumer should call Wait. Otherwise, the consumer might read "garbage" from the buffer, or the consumer might process a previous data item more than onceeach of these possibilities results in a logic error in the application. When the producer places the next data into the buffer, the producer should call Pulse to allow the consumer thread to proceed and read that data.

Let us consider how logic errors can arise if we do not synchronize access among multiple threads manipulating shared data. Consider a producer/consumer relationship in which a producer thread writes a sequence of numbers (we use 110) into a shared buffera memory location shared between multiple threads. The consumer thread reads this data from the shared buffer, then displays the data. We display in the program's output the values that the producer writes (produces) and that the consumer reads (consumes). Figures 15.415.8 demonstrate a producer thread and a consumer thread accessing a single shared int variable without any synchronization. The producer thread writes to the variable; the consumer thread reads from it. We would like each value the producer thread writes to the shared variable to be consumed exactly once by the consumer thread. However, the threads in this example are not synchronized. Therefore, data can be lost if the producer places new data in the variable before the consumer consumes the previous data. Also, data can be incorrectly repeated if the consumer consumes data again before the producer produces the next value. If the consumer attempts to read before the producer produces the first value, the consumer reads garbage. To show these possibilities, the consumer thread in the example keeps a total of all the values it reads. The producer thread produces values from 1 to 10. If the consumer reads each value produced once and only once, the total would be 55. However, when you execute this program several times, you will see that the total is rarely, if ever, 55. Also, to emphasize our point, the producer and consumer threads in the example each sleep for random intervals of up to three seconds between performing their tasks. Thus, we do not know exactly when the producer thread will attempt to write a new value, nor do we know when the consumer thread will attempt to read a value.

The program consists of interface Buffer (Fig. 15.4) and classes Producer (Fig. 15.5), Consumer (Fig. 15.6), UnsynchronizedBuffer (Fig. 15.7) and UnsynchronizedBufferTest (Fig. 15.8). Interface Buffer declares an int property called Buffer. Any implementation of Buffer must provide a get accessor and a set accessor for this property to allow the producer and consumer to access the shared data.

Figure 15.4. Buffer interface used in producer/consumer examples.

1 // Fig. 15.4: Buffer.cs 2 // Interface for a shared buffer of int. 3 using System; 4 5 // this interface represents a shared buffer 6 public interface Buffer 7 { 8 // property Buffer 9 int Buffer 10 { 11 get; 12 set; 13 } // end property Buffer 14 } // end interface Buffer

Figure 15.5. Producer represents the producer thread in a producer/consumer relationship.

(This item is displayed on page 732 in the print version)

1 // Fig. 15.5: Producer.cs 2 // Producer produces 10 integer values in the shared buffer. 3 using System; 4 using System.Threading; 5 6 // class Producer's Produce method controls a thread that 7 // stores values from 1 to 10 in sharedLocation 8 public class Producer 9 { 10 private Buffer sharedLocation; 11 private Random randomSleepTime; 12 13 // constructor 14 public Producer( Buffer shared, Random random ) 15 { 16 sharedLocation = shared; 17 randomSleepTime = random; 18 } // end constructor 19 20 // store values 1-10 in object sharedLocation 21 public void Produce() 22 { 23 // sleep for random interval up to 3000 milliseconds 24 // then set sharedLocation's Buffer property 25 for ( int count = 1; count <= 10; count++ ) 26 { 27 Thread.Sleep( randomSleepTime.Next( 1, 3001 ) ); 28 sharedLocation.Buffer = count; 29 } // end for 30 31 Console.WriteLine( "{0} done producing. Terminating {0}.", 32 Thread.CurrentThread.Name ); 33 } // end method Produce 34 } // end class Producer

Figure 15.6. Consumer represents the consumer thread in a producer/consumer relationship.

(This item is displayed on page 733 in the print version)

1 // Fig. 15.6: Consumer.cs 2 // Consumer consumes 10 integer values from the shared buffer. 3 using System; 4 using System.Threading; 5 6 // class Consumer's Consume method controls a thread that 7 // loops 10 times and reads a value from sharedLocation 8 public class Consumer 9 { 10 private Buffer sharedLocation; 11 private Random randomSleepTime; 12 13 // constructor 14 public Consumer( Buffer shared, Random random ) 15 { 16 sharedLocation = shared; 17 randomSleepTime = random; 18 } // end constructor 19 20 // read sharedLocation's value ten times 21 public void Consume() 22 { 23 int sum = 0; 24 25 // sleep for random interval up to 3000 milliseconds then 26 // add sharedLocation's Buffer property value to sum 27 for ( int count = 1; count <= 10; count++ ) 28 { 29 Thread.Sleep( randomSleepTime.Next( 1, 3001 ) ); 30 sum += sharedLocation.Buffer; 31 } // end for 32 33 Console.WriteLine( 34 "{0} read values totaling: {1}. Terminating {0}.", 35 Thread.CurrentThread.Name, sum ); 36 } // end method Consume 37 } // end class Consumer

Figure 15.7. UnsynchronizedBuffer maintains the shared integer variable that is accessed by a producer thread and a consumer thread via property Buffer.

(This item is displayed on page 734 in the print version)

1 // Fig. 15.7: UnsynchronizedBuffer.cs 2 // An unsynchronized shared buffer implementation. 3 using System; 4 using System.Threading; 5 6 // this class represents a single shared int 7 public class UnsynchronizedBuffer : Buffer 8 { 9 // buffer shared by producer and consumer threads 10 private int buffer = -1; 11 12 // property Buffer 13 public int Buffer 14 { 15 get 16 { 17 Console.WriteLine( "{0} reads {1}", 18 Thread.CurrentThread.Name, buffer ); 19 return buffer; 20 } // end get 21 set 22 { 23 Console.WriteLine( "{0} writes {1}", 24 Thread.CurrentThread.Name, value ); 25 buffer = value; 26 } // end set 27 } // end property Buffer 28 } // end class UnsynchronizedBuffer

Figure 15.8. Producer and consumer threads accessing a shared object without synchronization.

(This item is displayed on pages 735 - 736 in the print version)

1 // Fig. 15.8: UnsynchronizedBufferTest.cs 2 // Showing multiple threads modifying a shared object without 3 // synchronization. 4 using System; 5 using System.Threading; 6 7 // this class creates producer and consumer threads 8 class UnsynchronizedBufferTest 9 { 10 // create producer and consumer threads and start them 11 static void Main( string[] args ) 12 { 13 // create shared object used by threads 14 UnsynchronizedBuffer shared = new UnsynchronizedBuffer(); 15 16 // Random object used by each thread 17 Random random = new Random(); 18 19 // create Producer and Consumer objects 20 Producer producer = new Producer( shared, random ); 21 Consumer consumer = new Consumer( shared, random ); 22 23 // create threads for producer and consumer and set 24 // delegates for each thread 25 Thread producerThread = 26 new Thread( new ThreadStart( producer.Produce ) ); 27 producerThread.Name = "Producer"; 28 29 Thread consumerThread = 30 new Thread( new ThreadStart( consumer.Consume ) ); 31 consumerThread.Name = "Consumer"; 32 33 // start each thread 34 producerThread.Start(); 35 consumerThread.Start(); 36 } // end Main 37 } // end class UnsynchronizedBufferTest  

Consumer reads -1 Producer writes 1 Consumer reads 1 Producer writes 2 Consumer reads 2 Consumer reads 2 Producer writes 3 Consumer reads 3 Consumer reads 3 Producer writes 4 Consumer reads 4 Producer writes 5 Consumer reads 5 Consumer reads 5 Producer writes 6 Consumer reads 6 Consumer read values totaling: 30. Terminating Consumer. Producer writes 7 Producer writes 8 Producer writes 9 Producer writes 10 Producer done producing. Terminating Producer.

 

Producer writes 1 Producer writes 2 Consumer reads 2 Consumer reads 2 Producer writes 3 Producer writes 4 Consumer reads 4 Consumer reads 4 Producer writes 5 Consumer reads 5 Producer writes 6 Consumer reads 6 Producer writes 7 Consumer reads 7 Producer writes 8 Consumer reads 8 Producer writes 9 Producer writes 10 Producer done producing. Terminating Producer. Consumer reads 10 Consumer reads 10 Consumer read values totaling: 58. Terminating Consumer.

Class Producer (Figure 15.5) consists of instance variable sharedLocation (line 10) of type Buffer, instance variable randomSleepTime (line 11) of type Random, a constructor (lines 1418) to initialize the instance variables and a Produce method (lines 2133). The constructor initializes instance variable sharedLocation to refer to the Buffer object received from method Main as the parameter shared. The producer thread in this program executes the tasks specified in method Produce of class Producer. The for statement in method Produce (lines 2529) loops 10 times. Each iteration of the loop first invokes Thread method Sleep to place the producer thread in the WaitSleepJoin state for a random time interval between 0 and 3 seconds. When the thread awakens, line 28 assigns the value of control variable count to sharedLocation's Buffer property. When the loop completes, lines 3132 display a line of text in the console window indicating that the thread finished producing data and that the thread is terminating. The Produce method then terminates, and the producer thread enters the Stopped state.

Class Consumer (Figure 15.6) consists of instance variable sharedLocation (line 10) of type Buffer, instance variable randomSleepTime (line 11) of type Random, a constructor (lines 1418) to initialize the instance variables and a Consume method (lines 2136). The constructor initializes sharedLocation to refer to the Buffer object received from Main as the parameter shared. The consumer thread in this program performs the tasks specified in class Consumer's Consume method. The method contains a for statement (lines 2731) that loops ten times. Each iteration of the loop invokes THRead method Sleep to put the consumer thread into the WaitSleepJoin state for a random time interval between 0 and 3 seconds. Next, line 30 gets the value of sharedLocation's Buffer property and adds the value to variable sum. When the loop completes, lines 3335 display a line in the console window indicating the sum of all values read. Again, ideally the total should be 55, but because access to the shared data is not synchronized, this sum will almost never appear. The Consume method then terminates, and the consumer thread enters the Stopped state.

We use method Sleep in this example's threads to emphasize the fact that in multithreaded applications, it is unclear when each thread will perform its task and for how long it will perform that task when it has the processor. Normally, these thread-scheduling issues are the job of the computer's operating system. In this program, our thread's tasks are quite simplefor the producer, loop 10 times and perform an assignment statement; for the consumer, loop 10 times and add a value to variable sum. Without the Sleep method call, and if the producer executes first, the producer would most likely complete its task before the consumer ever gets a chance to execute. If the consumer executes first, it would consume -1 ten times, then terminate before the producer could produce the first real value.

Class UnsynchronizedBuffer (Figure 15.7) implements interface Buffer (line 7) and consists of instance variable buffer (line 10) and property Buffer (lines 1327), which provides get and set accessors. Property Buffer's accessors do not synchronize access to instance variable buffer. Note that each accessor uses class Thread's static property CurrentThread to obtain a reference to the currently executing thread, then uses that thread's Name property to obtain the thread's name for output purposes.

Class UnsynchronizedBufferTest (Figure 15.8) defines a Main method (lines 1136) that instantiates a shared UnsynchronizedBuffer object (line 14) and a Random object (line 17) for generating random sleep times. These are used as arguments to the constructors for the objects of classes Producer (line 20) and Consumer (line 21). The UnsynchronizedBuffer object contains the data that will be shared between the producer and consumer threads. Because UnsynchronizedBuffer implements the Buffer interface, the Producer and Consumer constructors can each take an UnsynchronizedBuffer object and assign it to their respective Buffer variables named sharedLocation. Lines 2527 create and name producerThread. The ThreadStart delegate for producerThread specifies that the thread will execute method Produce of object producer. Lines 2931 create and name the consumerThread. The ThreadStart delegate for the consumerThread specifies that the thread will execute method Consume of object consumer. Finally, lines 3435 place the two threads in the Running state by invoking each thread's Start method, then the thread executing Main terminates.

Ideally, we would like every value produced by the Producer object to be consumed exactly once by the Consumer object. However, when we study the first output of Fig. 15.8, we see that the consumer retrieved a value (1) before the producer ever placed a value in the shared buffer and that the values 2, 3 and 5 were consumed twice each. The consumer finished executing before the producer had an opportunity to produce the values 7, 8, 9 and 10. Therefore, those four values were lost, and an incorrect sum resulted. In the second output, we see that the value 1 was lost, because the values 1 and 2 were produced before the consumer thread could read the value 1. The values 3 and 9 were also lost and the values 4 and 10 were consumed twice each, also resulting in an incorrect sum. This example clearly demonstrates that access to shared data by concurrent threads must be controlled carefully; otherwise, a program may produce incorrect results.

To solve the problems of lost data and data consumed more than once in the previous example, we will (in Figs. 15.9 and 15.10) synchronize the access of the concurrent producer and consumer threads to the code that manipulates the shared data by using Monitor class methods Enter, Wait, Pulse and Exit. When a thread uses synchronization to access a shared object, the object is locked, so no other thread can acquire the lock for that shared object at the same time.

Категории