Producer/Consumer Relationship without Synchronization

Producer Consumer Relationship without Synchronization

In a producer/consumer relationship, the producer portion of an application generates data and stores it in a shared object, and the consumer portion of an application reads data from the shared object. One example of a common producer/consumer relationship is print spooling. A word processor spools data to a buffer (typically a file) and that data is subsequently consumed by the printer as it prints the document. Similarly, an application that copies data onto compact discs places data in a fixed-size buffer that is emptied as the CD-RW drive burns the data onto the compact disc.

In a multithreaded producer/consumer relationship, a producer thread generates data and places it in a shared object called a buffer. A consumer thread reads data from the buffer. If the producer waiting to put the next data into the buffer determines that the consumer has not yet read the previous data from the buffer, the producer thread should call await so that the consumer can read the data before further updatesotherwise the consumer never sees the previous data and that data is lost to the application. When the consumer thread reads the data, it should call signal to allow a waiting producer to store the next value. If a consumer thread finds the buffer empty or finds that the previous data has already been read, the consumer should call awaitotherwise the consumer might read old data again from the buffer. When the producer places the next data into the buffer, the producer should call signal to allow the consumer thread to proceed, so that the consumer can read the new data.

Let us consider how logic errors can arise if we do not synchronize access among multiple threads manipulating shared data. Our next example (Fig. 23.6Fig. 23.10) implements a producer/consumer relationship in which a producer thread writes the numbers 1 through 10 into a shared buffera memory location shared between two threads (a single int variable called buffer in line 6 of Fig. 23.9 in this example). The consumer thread reads this data from the shared buffer and displays the data. The program's output shows the values that the producer writes (produces) into the shared buffer and the values that the consumer reads (consumes) from the shared buffer.

Figure 23.6. Buffer interface used in producer/consumer examples.

(This item is displayed on page 1065 in the print version)

1 // Fig. 23.6: Buffer.java 2 // Buffer interface specifies methods called by Producer and Consumer. 3 4 public interface Buffer 5 { 6 public void set( int value ); // place int value into Buffer 7 public int get(); // return int value from Buffer 8 } // end interface Buffer

Each value the producer thread writes to the shared buffer must be consumed exactly once by the consumer thread. However, the threads in this example are not synchronized. Therefore, data can be lost if the producer places new data into the shared buffer before the consumer consumes the previous data. Also, data can be incorrectly duplicated if the consumer consumes data again before the producer produces the next value. To show these possibilities, the consumer thread in the following example keeps a total of all the values it reads. The producer thread produces values from 1 through 10. If the consumer reads each value produced once and only once, the total will be 55. However, if you execute this program several times, you will see that the total is not always 55 (as shown in the outputs in Fig. 23.10). To emphasize the point, the producer and consumer threads in the example each sleep for random intervals of up to three seconds between performing their tasks. Thus, we do not know exactly when the producer thread will attempt to write a new value, nor do we know when the consumer thread will attempt to read a value.

The program consists of interface Buffer (Fig. 23.6) and four classesProducer (Fig. 23.7), Consumer (Fig. 23.8), UnsynchronizedBuffer (Fig. 23.9) and SharedBufferTest (Fig. 23.10). Interface Buffer declares methods set and get that a Buffer must implement to enable the Producer tHRead to place a value in the Buffer and the Consumer tHRead to retrieve a value from the Buffer. We will see the implementation of this interface in Fig. 23.9.

Figure 23.7. Producer represents the producer thread in a producer/consumer relationship.

(This item is displayed on page 1066 in the print version)

1 // Fig. 23.7: Producer.java 2 // Producer's run method stores the values 1 to 10 in buffer. 3 import java.util.Random; 4 5 public class Producer implements Runnable 6 { 7 private static Random generator = new Random(); 8 private Buffer sharedLocation; // reference to shared object 9 10 // constructor 11 public Producer( Buffer shared ) 12 { 13 sharedLocation = shared; 14 } // end Producer constructor 15 16 // store values from 1 to 10 in sharedLocation 17 public void run() 18 { 19 int sum = 0; 20 21 for ( int count = 1; count <= 10; count++ ) 22 { 23 try // sleep 0 to 3 seconds, then place value in Buffer 24 { 25 Thread.sleep( generator.nextInt( 3000 ) ); // sleep thread 26 sharedLocation.set( count ); // set value in buffer 27 sum += count; // increment sum of values 28 System.out.printf( " %2d ", sum ); 29 } // end try 30 // if sleeping thread interrupted, print stack trace 31 catch ( InterruptedException exception ) 32 { 33 exception.printStackTrace(); 34 } // end catch 35 } // end for 36 37 System.out.printf( " %s %s ", "Producer done producing.", 38 "Terminating Producer." ); 39 } // end method run 40 } // end class Producer

Figure 23.8. Consumer represents the consumer thread in a producer/consumer relationship.

(This item is displayed on page 1067 in the print version)

1 // Fig. 23.8: Consumer.java 2 // Consumer's run method loops ten times reading a value from buffer. 3 import java.util.Random; 4 5 public class Consumer implements Runnable 6 { 7 private static Random generator = new Random(); 8 private Buffer sharedLocation; // reference to shared object 9 10 // constructor 11 public Consumer( Buffer shared ) 12 { 13 sharedLocation = shared; 14 } // end Consumer constructor 15 16 // read sharedLocation's value four times and sum the values 17 public void run() 18 { 19 int sum = 0; 20 21 for ( int count = 1; count <= 10; count++ ) 22 { 23 // sleep 0 to 3 seconds, read value from buffer and add to sum 24 try 25 { 26 Thread.sleep( generator.nextInt( 3000 ) ); 27 sum += sharedLocation.get(); 28 System.out.printf( " %2d ", sum ); 29 } // end try 30 // if sleeping thread interrupted, print stack trace 31 catch ( InterruptedException exception ) 32 { 33 exception.printStackTrace(); 34 } // end catch 35 } // end for 36 37 System.out.printf( " %s %d. %s ", 38 "Consumer read values totaling", sum, "Terminating Consumer." ); 39 } // end method run 40 } // end class Consumer

Figure 23.9. UnsynchronizedBuffer maintains the shared integer that is accessed by a producer thread and a consumer thread via methods set and get.

(This item is displayed on page 1068 in the print version)

1 // Fig. 23.9: UnsynchronizedBuffer.java 2 // UnsynchronizedBuffer represents a single shared integer. 3 4 public class UnsynchronizedBuffer implements Buffer 5 { 6 private int buffer = -1; // shared by producer and consumer threads 7 8 // place value into buffer 9 public void set( int value ) 10 { 11 System.out.printf( "Producer writes %2d", value ); 12 buffer = value; 13 } // end method set 14 15 // return value from buffer 16 public int get() 17 { 18 System.out.printf( "Consumer reads %2d", buffer ); 19 return buffer; 20 } // end method get 21 } // end class UnsynchronizedBuffer

Figure 23.10. SharedBufferTest sets up a producer/consumer application that uses an unsynchronized buffer.

(This item is displayed on pages 1069 - 1071 in the print version)

1 // Fig 23.10: SharedBufferTest.java 2 // Application shows two threads manipulating an unsynchronized buffer. 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 6 public class SharedBufferTest 7 { 8 public static void main( String[] args ) 9 { 10 // create new thread pool with two threads 11 ExecutorService application = Executors.newFixedThreadPool( 2 ); 12 13 // create UnsynchronizedBuffer to store ints 14 Buffer sharedLocation = new UnsynchronizedBuffer(); 15 16 System.out.println( "Action Value Produced Consumed" ); 17 System.out.println( "------ ----- -------- -------- " ); 18 19 // try to start producer and consumer giving each of them access 20 // to sharedLocation 21 try 22 { 23 application.execute( new Producer( sharedLocation ) ); 24 application.execute( new Consumer( sharedLocation ) ); 25 } // end try 26 catch ( Exception exception ) 27 { 28 exception.printStackTrace(); 29 } // end catch 30 31 application.shutdown(); // terminate application when threads end 32 } // end main 33 } // end class SharedBufferTest  

Action Value Produced Consumed ------ ----- -------- -------- Producer writes 1 1 Producer writes 2 3 Producer writes 3 6 Consumer reads 3 3 Producer writes 4 10 Consumer reads 4 7 Producer writes 5 15 Producer writes 6 21 Producer writes 7 28 Consumer reads 7 14 Consumer reads 7 21 Producer writes 8 36 Consumer reads 8 29 Consumer reads 8 37 Producer writes 9 45 Producer writes 10 55 Producer done producing. Terminating Producer. Consumer reads 10 47 Consumer reads 10 57 Consumer reads 10 67 Consumer reads 10 77 Consumer read values totaling 77. Terminating Consumer. Action Value Produced Consumed ------ ----- -------- -------- Consumer reads -1 -1 Producer writes 1 1 Consumer reads 1 0 Consumer reads 1 1 Consumer reads 1 2 Consumer reads 1 3 Consumer reads 1 4 Producer writes 2 3 Consumer reads 2 6 Producer writes 3 6 Consumer reads 3 9 Producer writes 4 10 Consumer reads 4 13 Producer writes 5 15 Producer writes 6 21 Consumer reads 6 19 Consumer read values totaling 19. Terminating Consumer. Producer writes 7 28 Producer writes 8 36 Producer writes 9 45 Producer writes 10 55 Producer done producing. Terminating Producer.  

Class Producer (Fig. 23.7) implements the Runnable interface, allowing it to be executed in a separate thread. The constructor (lines 1114) initializes Buffer reference sharedLocation with an object created in main (line 14 of Fig. 23.10) and passed to the constructor in the parameter shared. As we will see, this is an UnsynchronizedBuffer object that implements interface Buffer without synchronizing access to the shared object. The Producer tHRead in this program executes the tasks specified in method run (lines 1739). Each iteration of the loop (lines 2135) invokes THRead method sleep (line 25) to place the Producer tHRead into the timed waiting state for a random time interval between 0 and 3 seconds. When the thread awakens, line 26 passes the value of control variable count to the Buffer object's set method to set the shared buffer's value. Line 27 keeps a total of all the values produced so far and line 28 outputs that value. When the loop completes, lines 3738 display a message indicating that the thread has finished producing data and is terminating. Next, method run terminates which indicates that the Producer completed its task. It is important to note that any method called from a thread's run method (e.g., Buffer method set) executes as part of that thread of execution. In fact, each thread has its own method call stack. This fact becomes important in Section 23.7 when we add synchronization to the producer/consumer relationship.

Class Consumer (Fig. 23.8) also implements interface Runnable, allowing the Consumer to be executed concurrently with the Producer. The constructor (lines 1114) initializes Buffer reference sharedLocation with an object that implements the Buffer interface created in main (Fig. 23.10) and passed to the constructor as the parameter shared. As we will see, this is the same UnsynchronizedBuffer object that is used to initialize the Producer objectthus, the two threads share the same object. The Consumer thread in this program performs the tasks specified in method run (lines 1739). The loop at lines 2135 loops ten times. Each iteration of the loop invokes THRead method sleep (line 26) to put the Consumer thread into the timed waiting state for between 0 and 3 seconds. Next, line 27 uses the Buffer's get method to retrieve the value in the shared buffer, then adds the value to variable sum. Line 28 displays the total of all the values consumed so far. When the loop completes, lines 3738 display a line indicating the sum of the consumed values. Then method run terminates, which indicates that the Consumer completed its task. Once both threads enter the terminated state, the program ends.

[Note: We use method sleep in method run of the Producer and Consumer classes to emphasize the fact that in multithreaded applications, it is unpredictable when each thread will perform its task and for how long it will perform the task when it has a processor. Normally, these thread-scheduling issues are the job of the computer's operating system and therefore beyod the control of the Java developer. In this program, our thread's tasks are quite simplefor the Producer, write the values 1 to 10 to the buffer, and for the Consumer, read 10 values from the buffer and add each value to variable sum. Without the sleep method call, and if the Producer executes first, given today's phenomenally fast processors, the Producer would likely complete its task before the Consumer gets a chance to execute. If the Consumer executes first, it would likely consume -1 ten times, then terminate before the Producer could produce the first real value.]

Class UnsynchronizedBuffer (Fig. 23.9) implements interface Buffer (line 4). An object of this class is shared between the Producer and the Consumer. Line 6 declares instance variable buffer and initializes it with the value -1. This value is used to demonstrate the case in which the Consumer attempts to consume a value before the Producer ever places a value in buffer. Methods set (lines 913) and get (lines 1620) do not synchronize access to field buffer. Method set simply assigns its argument to buffer (line 12), and method get simply returns the value of buffer (line 19).

Class SharedBufferTest contains method main (lines 832), which launches the application. Line 11 creates an ExecutorService with two threadsone to execute the Producer and one to execute the Consumer. Line 14 creates an UnsynchronizedBuffer object and assigns its reference to Buffer variable sharedLocation. This object stores the data that will be shared between the Producer and Consumer threads. Lines 2324 create and execute the Producer and Consumer. Note that the Producer and Consumer constructors are each passed the same Buffer object (sharedLocation), so each object is initialized with a reference to the same Buffer. These lines also implicitly launch the threads and call each Runnable's run method. Finally, line 31 calls method shutdown so that the application can terminate when the Producer and Consumer threads complete their tasks. When method main terminates (line 32), the main thread of execution enters the terminated state.

Recall from the overview of this example that we would like the Producer thread to execute first and every value produced by the Producer to be consumed exactly once by the Consumer. However, when we study the first output of Fig. 23.10, we see that the Producer writes a value three times before the Consumer reads its first value (3). Therefore, the values 1 and 2 are lost. Later, the values 5, 6 and 9 are lost, while 7 and 8 are read twice and 10 is read four times. So the first output produced an incorrect total of 77, instead of a correct total of 55. In the second output, note that the Consumer reads before the Producer ever writes a value. Also note that the Consumer has already read five times before the Producer writes the value 2. Meanwhile, the values 5, 7, 8, 9 and 10 are all lost. An incorrect output of 19 is produced. (Lines in the output where the Producer or Consumer has acted out of order are highlighted.) This example clearly demonstrates that access to a shared object by concurrent threads must be controlled carefully, for otherwise a program may produce incorrect results.

To solve the problems of lost and duplicated data, Section 23.7 presents an example in which we use a Lock and Condition methods await and signal to synchronize access to the code that manipulates the shared object, guaranteeing that each and every value will be processed once and only once. When a thread acquires a lock, no other threads can acquire that same lock until the original thread releases it.

Категории