Producer/Consumer Relationship: ArrayBlockingQueue

Producer Consumer Relationship ArrayBlockingQueue

J2SE 5.0 includes a fully-implemented circular buffer class named ArrayBlockingQueue in package java.util.concurrent, which implements the BlockingQueue interface. The BlockingQueue interface implements the Queue interface, discussed in Chapter 19 and declares methods put and take, the blocking equivalents of Queue methods offer and poll, respectively. This means that method put will place an element at the end of the BlockingQueue, waiting if the queue is full. Method take will remove an element from the head of the BlockingQueue, waiting if the queue is empty. Class ArrayBlockingQueue implements the BlockingQueue interface using an array. This makes the data structure fixed size, meaning that it will not expand to accommodate extra elements. Class ArrayBlockingQueue encapsulates all the functionality of our circular buffer class (Fig. 23.13).

The program in Fig. 23.15Fig. 23.16 demonstrates a Producer and a Consumer accessing a circular buffer (in this case, an ArrayBlockingQueue) with synchronization. Class BlockingBuffer implements interface Buffer (Fig. 23.15) and contains an ArrayBlockingQueue instance variable that stores Integer objects (line 7). By choosing to implement Buffer, our application can reuse the Producer (Fig. 23.7) and Consumer (Fig. 23.8) classes.

Figure 23.15. BlockingBuffer creates a blocking circular buffer using the ArrayBlockingQueue class.

1 // Fig. 23.15: BlockingBuffer.java 2 // Class synchronizes access to a blocking buffer. 3 import java.util.concurrent.ArrayBlockingQueue; 4 5 public class BlockingBuffer implements Buffer 6 { 7 private ArrayBlockingQueue buffer; 8 9 public BlockingBuffer() 10 { 11 buffer = new ArrayBlockingQueue( 3 ); 12 } // end BlockingBuffer constructor 13 14 // place value into buffer 15 public void set( int value ) 16 { 17 try 18 { 19 buffer.put( value ); // place value in circular buffer 20 System.out.printf( "%s%2d %s%d ", "Producer writes ", value, 21 "Buffers occupied: ", buffer.size() ); 22 } // end try 23 catch ( Exception exception ) 24 { 25 exception.printStackTrace(); 26 } // end catch 27 } // end method set 28 29 // return value from buffer 30 public int get() 31 { 32 int readValue = 0; // initialize value read from buffer 33 34 try 35 { 36 readValue = buffer.take(); // remove value from circular buffer 37 System.out.printf( "%s %2d %s%d ", "Consumer reads ", 38 readValue, "Buffers occupied: ", buffer.size() ); 39 } // end try 40 catch ( Exception exception ) 41 { 42 exception.printStackTrace(); 43 } // end catch 44 45 return readValue; 46 } // end method get 47 } // end class BlockingBuffer

Figure 23.16. BlockingBufferTest sets up a producer/consumer application using a blocking circular buffer.

(This item is displayed on pages 1086 - 1087 in the print version)

1 // Fig 23.16: BlockingBufferTest.java 2 // Application shows two threads manipulating a blocking buffer. 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 6 public class BlockingBufferTest 7 { 8 public static void main( String[] args ) 9 { 10 // create new thread pool with two threads 11 ExecutorService application = Executors.newFixedThreadPool( 2 ); 12 13 // create BlockingBuffer to store ints 14 Buffer sharedLocation = new BlockingBuffer(); 15 16 try // try to start producer and consumer 17 { 18 application.execute( new Producer( sharedLocation ) ); 19 application.execute( new Consumer( sharedLocation ) ); 20 } // end try 21 catch ( Exception exception ) 22 { 23 exception.printStackTrace(); 24 } // end catch 25 26 application.shutdown(); 27 } // end main 28 } // end class BlockingBufferTest  

Producer writes 1 Buffers occupied: 1 Consumer reads 1 Buffers occupied: 0 Producer writes 2 Buffers occupied: 1 Consumer reads 2 Buffers occupied: 0 Producer writes 3 Buffers occupied: 1 Consumer reads 3 Buffers occupied: 0 Producer writes 4 Buffers occupied: 1 Consumer reads 4 Buffers occupied: 0 Producer writes 5 Buffers occupied: 1 Consumer reads 5 Buffers occupied: 0 Producer writes 6 Buffers occupied: 1 Consumer reads 6 Buffers occupied: 0 Producer writes 7 Buffers occupied: 1 Producer writes 8 Buffers occupied: 2 Consumer reads 7 Buffers occupied: 1 Producer writes 9 Buffers occupied: 2 Consumer reads 8 Buffers occupied: 1 Producer writes 10 Buffers occupied: 2 Producer done producing. Terminating Producer. Consumer reads 9 Buffers occupied: 1 Consumer reads 10 Buffers occupied: 0 Consumer read values totaling 55. Terminating Consumer.  

Line 19 in method set (lines 1527) calls method put on the ArrayBlockingQueue. This method call will block until there is room in buffer to place value. Method get (lines 3046) of class BlockingBuffer calls method take (line 36) on the ArrayBlockingQueue. Again, this method call will block until there is an element in buffer to remove. Note that neither of these methods requires a Lock or Condition object. The ArrayBlockingQueue handles all of the synchronization for you. The amount of code in this program is greatly decreased from the previous circular buffer (from 123 lines to 47 lines) and is much easier to understand. This is an excellent example of encapsulation and software reuse.

Class BlockingBufferTest (Fig. 23.16) contains the main method that launches the application. Line 11 creates the ExecutorService, and line 14 creates a BlockingBuffer object and assigns its reference to Buffer variable sharedLocation. Lines 1819 execute the Producer and Consumer Runnables. Line 26 calls method shutdown to end the application when the Producer and Consumer finish.

In our prior synchronization examples, the output statements in the Buffer's set and get methods that indicated what the Producer was writing or the Consumer was reading were always executed while the Buffer's lock was held by the thread calling set or get. This guaranteed the order in which the output would be displayed. If the Consumer had the lock, the Producer could not execute the set methodtherefore, it was not possible for the Producer to output out of turn. The reverse was also true. In Fig. 23.15, methods set and get no longer use locksall locking is handled by the ArrayBlockingQueue. Because this class is from the Java API, we cannot modify it to perform output from its put and take methods. For these reasons, it is possible that the Producer and Consumer output statements in this example could print out of order. Even though the ArrayBlockingQueue is properly synchronizing access to the data, the output statements are no longer synchronized.

Категории