Producer/Consumer Relationship: Circular Buffer
Producer Consumer Relationship Circular Buffer
The program in Section 23.7 uses thread synchronization to guarantee that two threads manipulate data in a shared buffer correctly. However, the application may not perform optimally. If the two threads operate at different speeds, one of the threads will spend more (or most) of its time waiting. For example, in the program in Section 23.7 we shared a single integer variable between the two threads. If the producer thread produces values faster than the consumer can consume them, then the producer thread waits for the consumer, because there are no other locations in memory in which to place the next value. Similarly, if the consumer consumes values faster than the producer produces them, the consumer waits until the producer places the next value in the shared location in memory. Even when we have threads that operate at the same relative speeds, those threads may occasionally become "out of sync" over a period of time, causing one of them to wait for the other. We cannot make assumptions about the relative speeds of concurrent threadsinteractions that occur with the operating system, the network, the user and other components can cause the threads to operate at different speeds. When this happens, threads wait. When threads wait excessively, programs become less efficient, user-interactive programs become less responsive and applications suffer longer delays.
To minimize the amount of waiting time for threads that share resources and operate at the same average speeds, we can implement a circular buffer that provides extra buffer space into which the producer can place values and from which the consumer can retrieve those values. Let us assume that the buffer is implemented as an array. The producer and consumer work from the beginning of the array. When either thread reaches the end of the array, it simply returns to the first element of the array to perform its next task. If the producer temporarily produces values faster than the consumer can consume them, the producer can write additional values into the extra buffer space (if any are available). This capability enables the producer to perform its task even though the consumer is not ready to receive the current value being produced. Similarly, if the consumer consumes faster than the producer produces new values, the consumer can read additional values (if there are any) from the buffer. This enables the consumer to keep busy even though the producer is not ready to produce additional values.
Note that the circular buffer would be inappropriate if the producer and the consumer operate consistently at different speeds. If the consumer always executes faster than the producer, then a buffer containing one location is enough. Additional locations would waste memory. If the producer always executes faster, only a buffer with an infinite number of locations would be able to absorb the extra production.
The key to using a circular buffer with a producer and consumer that operate at about the same speed is to provide it with enough locations to handle the anticipated "extra" production. If, over a period of time, we determine that the producer often produces as many as three more values than the consumer can consume, we can provide a buffer of at least three cells to handle the extra production. We do not want the buffer to be too small, because that would cause threads to wait longer. On the other hand, we do not want the buffer to be too large, because that would waste memory.
Performance Tip 23.4
Even when using a circular buffer, it is possible that a producer thread could fill the buffer, which would force the producer thread to wait until a consumer consumes a value to free an element in the buffer. Similarly, if the buffer is empty at any given time, the consumer thread must wait until the producer produces another value. The key to using a circular buffer is to optimize the buffer size to minimize the amount of thread wait time. |
The program in Fig. 23.13Fig. 23.14 demonstrates a producer and a consumer accessing a circular buffer (in this case, a shared array of three cells) with synchronization. In this version of the producer/consumer relationship, the consumer consumes a value only when the array is not empty and the producer produces a value only when the array is not full. The statements that created and started the thread objects in the main method of class SharedBufferTest2 (Fig. 23.12) now appear in class CircularBufferTest (Fig. 23.14).
Figure 23.13. CircularBuffer synchronizes access to a circular buffer containing three slots.
(This item is displayed on pages 1078 - 1080 in the print version)
1 // Fig. 23.13: CircularBuffer.java 2 // SynchronizedBuffer synchronizes access to a single shared integer. 3 import java.util.concurrent.locks.Lock; 4 import java.util.concurrent.locks.ReentrantLock; 5 import java.util.concurrent.locks.Condition; 6 7 public class CircularBuffer implements Buffer 8 { 9 // Lock to control synchronization with this buffer 10 private Lock accessLock = new ReentrantLock(); 11 12 // conditions to control reading and writing 13 private Condition canWrite = accessLock.newCondition(); 14 private Condition canRead = accessLock.newCondition(); 15 16 private int[] buffer = { -1, -1, -1 }; 17 18 private int occupiedBuffers = 0; // count number of buffers used 19 private int writeIndex = 0; // index to write next value 20 private int readIndex = 0; // index to read next value 21 22 // place value into buffer 23 public void set( int value ) 24 { 25 accessLock.lock(); // lock this object 26 27 // output thread information and buffer information, then wait 28 try 29 { 30 // while no empty locations, place thread in waiting state 31 while ( occupiedBuffers == buffer.length ) 32 { 33 System.out.printf( "All buffers full. Producer waits. " ); 34 canWrite.await(); // await until a buffer element is free 35 } // end while 36 37 buffer[ writeIndex ] = value; // set new buffer value 38 39 // update circular write index 40 writeIndex = ( writeIndex + 1 ) % buffer.length; 41 42 occupiedBuffers++; // one more buffer element is full 43 displayState( "Producer writes " + buffer[ writeIndex ] ); 44 canRead.signal(); // signal threads waiting to read from buffer 45 } // end try 46 catch ( InterruptedException exception ) 47 { 48 exception.printStackTrace(); 49 } // end catch 50 finally 51 { 52 accessLock.unlock(); // unlock this object 53 } // end finally 54 } // end method set 55 56 // return value from buffer 57 public int get() 58 { 59 int readValue = 0; // initialize value read from buffer 60 accessLock.lock(); // lock this object 61 62 // wait until buffer has data, then read value 63 try 64 { 65 // while no data to read, place thread in waiting state 66 while ( occupiedBuffers == 0 ) 67 { 68 System.out.printf( "All buffers empty. Consumer waits. " ); 69 canRead.await(); // await until a buffer element is filled 70 } // end while 71 72 readValue = buffer[ readIndex ]; // read value from buffer 73 74 // update circular read index 75 readIndex = ( readIndex + 1 ) % buffer.length; 76 77 occupiedBuffers--; // one more buffer element is empty 78 displayState( "Consumer reads " + readValue ); 79 canWrite.signal(); // signal threads waiting to write to buffer 80 } // end try 81 // if waiting thread interrupted, print stack trace 82 catch ( InterruptedException exception ) 83 { 84 exception.printStackTrace(); 85 } // end catch 86 finally 87 { 88 accessLock.unlock(); // unlock this object 89 } // end finally 90 91 return readValue; 92 } // end method get 93 94 // display current operation and buffer state 95 public void displayState( String operation ) 96 { 97 // output operation and number of occupied buffers 98 System.out.printf( "%s%s%d) %s", operation, 99 " (buffers occupied: ", occupiedBuffers, "buffers: " ); 100 101 for ( int value : buffer ) 102 System.out.printf( " %2d ", value ); // output values in buffer 103 104 System.out.print( " " ); 105 for ( int i = 0; i < buffer.length; i++ ) 106 System.out.print( "---- " ); 107 108 System.out.print( " " ); 109 for ( int i = 0; i < buffer.length; i++ ) 110 { 111 if ( i == writeIndex && i == readIndex ) 112 System.out.print( " WR" ); // both write and read index 113 else if ( i == writeIndex ) 114 System.out.print( " W " ); // just write index 115 else if ( i == readIndex ) 116 System.out.print( " R " ); // just read index 117 else 118 System.out.print( " " ); // neither index 119 } // end for 120 121 System.out.println( " " ); 122 } // end method displayState 123 } // end class CircularBuffer |
Figure 23.14. CircularBufferTest sets up a producer/consumer application and instantiates producer and consumer threads.
(This item is displayed on pages 1081 - 1084 in the print version)
1 // Fig 23.14: CircularBufferTest.java 2 // Application shows two threads manipulating a circular buffer. 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 6 public class CircularBufferTest 7 { 8 public static void main( String[] args ) 9 { 10 // create new thread pool with two threads 11 ExecutorService application = Executors.newFixedThreadPool( 2 ); 12 13 // create CircularBuffer to store ints 14 Buffer sharedLocation = new CircularBuffer(); 15 16 try // try to start producer and consumer 17 { 18 application.execute( new Producer( sharedLocation ) ); 19 application.execute( new Consumer( sharedLocation ) ); 20 } // end try 21 catch ( Exception exception ) 22 { 23 exception.printStackTrace(); 24 } // end catch 25 26 application.shutdown(); 27 } // end main 28 } // end class CircularBufferTest
|
The significant changes to the example in Section 23.7 occur in CircularBuffer (Fig. 23.13), which replaces SynchronizedBuffer (Fig. 23.11). Line 10 creates a new ReentrantLock object and assigns its reference to Lock variable accessLock. The ReentrantLock is created without a fairness policy because we have only two threads in this example and only one will ever be waiting. Lines 1314 create two Conditions using Lock method newCondition. Condition canWrite contains a queue for threads waiting while the buffer is full. If the buffer is full, the Producer calls method await on this Conditionwhen the Consumer frees space in a full buffer, it calls method signal on this Condition. Condition canRead contains a queue for threads waiting while the buffer is empty. If the buffer is empty, the Consumer calls method await on this Conditionwhen the Producer writes to the buffer, it calls method signal on this Condition. Array buffer (line 16) is a three-element integer array that represents the circular buffer. Variable occupiedBuffers (line 18) counts the number of elements in buffer that are filled with data available to be read. When occupiedBuffers is 0, there is no data in the circular buffer and the Consumer must waitwhen occupiedBuffers is 3 (the size of the circular buffer), the circular buffer is full and the Producer must wait. Variable writeIndex (line 19) indicates the next location in which a value can be placed by a Producer. Variable readIndex (line 20) indicates the position from which the next value can be read by a Consumer.
CircularBuffer method set (lines 2354) performs the same tasks that it did in Fig. 23.11, with a few modifications. The while loop at lines 3135 determines whether the Producer must wait (i.e., all buffers are full). If so, line 33 indicates that the Producer is waiting to perform its task. Then line 34 invokes Condition method await to place the Producer tHRead in the waiting state on the canWrite condition variable. When execution eventually continues at line 37 after the while loop, the value written by the Producer is placed in the circular buffer at location writeIndex. Then line 40 updates writeIndex for the next call to CircularBuffer method set. This line is the key to the circularity of the buffer. When writeIndex is incremented past the end of the buffer, this line sets it to 0. Line 42 increments occupiedBuffers, because there is now at least one value in the buffer that the Consumer can read. Next, line 43 invokes method displayState to update the output with the value produced, the number of occupied buffers, the contents of the buffers and the current writeIndex and readIndex. Line 44 invokes Condition method signal to indicate that a Consumer thread waiting on the canRead condition variable (if there is a waiting thread) should transition to the runnable state. Line 52 releases accessLock by calling method unlock inside a finally block.
Method get (lines 5792) of class CircularBuffer also performs the same tasks as it did in Fig. 23.11, with a few minor modifications. The while loop at lines 6670 determines whether the Consumer must wait (i.e., all buffers are empty). If the Consumer thread must wait, line 68 updates the output to indicate that the Consumer is waiting to perform its task. Then line 69 invokes Condition method await to place the current thread in the waiting state on the canRead condition variable. When execution eventually continues at line 72 after a signal call from the Producer, readValue is assigned the value at location readIndex in the circular buffer. Then line 75 updates readIndex for the next call to CircularBuffer method get. This line and line 40 create the circular effect of the buffer. Line 77 decrements the occupiedBuffers, because there is at least one open position in the buffer in which the Producer thread can place a value. Line 78 invokes method displayState to update the output with the consumed value, the number of occupied buffers, the contents of the buffers and the current writeIndex and readIndex. Line 79 invokes Condition method signal to transition the thread waiting to write into the CircularBuffer object into the runnable state. Line 88 releases accessLock inside a finally block to guarantee that the lock is released. Then line 91 returns the consumed value to the calling method.
Method displayState (lines 95122) outputs the state of the application. Lines 101102 output the current buffers. Line 102 uses method printf with a %2d format specifier to print the contents of each buffer with a leading space if it is a single digit. Lines 109119 output the current writeIndex and readIndex with the letters W and R respectively.
Class CircularBufferTest (Fig. 23.14) contains the main method that launches the application. Line 11 creates the ExecutorService with two threads, and line 14 creates a CircularBuffer object and assigns its reference to Buffer variable sharedLocation. Lines 1819 execute the Producer and Consumer. Line 26 calls method shutdown to end the application when the Producer and Consumer complete their tasks.
Each time the Producer writes a value or the Consumer reads a value, the program outputs the action performed (a read or a write) along with the contents of the buffer and the location of the write index and read index. In this output, the Producer first writes the value 1. The buffer then contains the value 1 in the first slot and the value -1 (the default value) in the other two slots. The write index is updated to the second slot, while the read index stays at the first slot. Next, the Consumer reads 1. The buffer contains the same values, but the read index has been updated to the second slot. The Consumer then tries to read again, but the buffer is empty and the Consumer is forced to wait. Note that only once in this execution of the program was it necessary for either thread to wait.