Producer/Consumer Relationship: Circular Buffer
Producer Consumer Relationship Circular Buffer
Figures 15.9 and 15.10 use thread synchronization to guarantee that two threads manipulate data in a shared buffer correctly. However, the application may not perform optimally. If the two threads operate at different speeds, one will spend more (or most) of its time waiting. For example, in Fig. 15.9 we shared a single integer between the two threads. If the producer thread produces values faster than the consumer can consume those values, then the producer thread waits for the consumer, because there are no other locations in memory to place the next value. Similarly, if the consumer consumes faster than the producer can produce values, the consumer waits until the producer places the next value in the shared location in memory. Even when we have threads that operate at the same relative speeds, those threads over a period of time may become "out of sync," causing one of the threads to wait for the other. We cannot make assumptions about the relative speeds of concurrently executing threads. There are too many interactions that occur with the operating system, the network, the user and other components, which can cause the threads to operate at different speeds. When this happens, threads wait. When threads wait, programs become less productive, user-interactive programs become less responsive and network applications suffer longer delays because the processor is not used efficiently.
To minimize the waiting for threads that share resources and operate at the same relative speeds, we can implement a circular buffer that provides extra locations in which the producer can place values (if it "gets ahead" of the consumer) and from which the consumer can retrieve those values (if it "catches up" to the producer). Let us assume the buffer is implemented as an array. The producer and consumer work from the beginning of the array. When either thread reaches the end of the array, it simply returns to the first element of the array to perform its next task. If the producer temporarily produces values faster than the consumer can consume them, the producer can write additional values in the extra buffers (if cells are available; otherwise, the producer must, once again, wait). This enables the producer to perform its task even though the consumer is not ready to receive the current value being produced. Similarly, if the consumer consumes faster than the producer produces new values, the consumer can read additional values from the buffer (if there are any; otherwise, the consumer must, once again, wait) and thus "catch up" to the producer. This enables the consumer to perform its task even though the producer is not ready to produce additional values.
Note that a circular buffer would be inappropriate if the producer and consumer operate at different speeds. If the consumer always executes faster than the producer, then a buffer with one location is enough. Additional locations would waste memory. If the producer always executes faster, a buffer with an infinite number of locations would be required to absorb the extra production.
The key to using a circular buffer is to define it with enough extra cells to handle the anticipated "extra" production. If, over a period of time, we determine that the producer often produces as many as three more values than the consumer can consume, we can define a buffer of at least three cells to handle the extra production. We do not want the buffer to be too small, because that would cause threads to wait more. On the other hand, we do not want the buffer to be too large, because that would waste memory.
Figures 15.11 and 15.12 demonstrate a producer and a consumer accessing a circular buffer (in this case, a shared array of three elements) with synchronization. In this version of the producer/consumer relationship, the consumer consumes a value only when the array is not empty, and the producer produces a value only when the array is not full. This example reuses interface Buffer (Fig. 15.4) and classes Producer (Fig. 15.5) and Consumer (Fig. 15.6). The statements that created and started the thread objects in the Main methods of class UnsynchronizedBufferTest in Fig. 15.8 and SynchronizedBufferTest in Fig. 15.10 now appear in class CircularBufferTest (Fig. 15.12).
Figure 15.11. CircularBuffer synchronizes access to a circular buffer containing three slots.
(This item is displayed on pages 747 - 749 in the print version)
1 // Fig. 15.11: CircularBuffer.cs 2 // A circular shared buffer for the producer/consumer relationship. 3 using System; 4 using System.Threading; 5 6 // implement the an array of shared integers with synchronization 7 public class CircularBuffer : Buffer 8 { 9 // each array element is a buffer 10 private int[] buffers = { -1, -1, -1 }; 11 12 // occupiedBufferCount maintains count of occupied buffers 13 private int occupiedBufferCount = 0; 14 15 private int readLocation = 0; // location of the next read 16 private int writeLocation = 0; // location of the next write 17 18 // property Buffer 19 public int Buffer 20 { 21 get 22 { 23 // lock this object while getting value 24 // from buffers array 25 lock ( this ) 26 { 27 // if there is no data to read, place invoking 28 // thread in WaitSleepJoin state 29 if ( occupiedBufferCount == 0 ) 30 { 31 Console.Write( " All buffers empty. {0} waits.", 32 Thread.CurrentThread.Name ); 33 Monitor.Wait( this ); // enter the WaitSleepJoin state 34 } // end if 35 36 // obtain value at current readLocation 37 int readValue = buffers[ readLocation ]; 38 39 Console.Write( " {0} reads {1} ", 40 Thread.CurrentThread.Name, buffers[ readLocation ] ); 41 42 // just consumed a value, so decrement number of 43 // occupied buffers 44 --occupiedBufferCount; 45 46 // update readLocation for future read operation, 47 // then add current state to output 48 readLocation = ( readLocation + 1 ) % buffers.Length; 49 Console.Write( CreateStateOutput() ); 50 51 // return waiting thread (if there is one) 52 // to Running state 53 Monitor.Pulse( this ); 54 55 return readValue; 56 } // end lock 57 } // end get 58 set 59 { 60 // lock this object while setting value 61 // in buffers array 62 lock ( this ) 63 { 64 // if there are no empty locations, place invoking 65 // thread in WaitSleepJoin state 66 if ( occupiedBufferCount == buffers.Length ) 67 { 68 Console.Write( " All buffers full. {0} waits.", 69 Thread.CurrentThread.Name ); 70 Monitor.Wait( this ); // enter the WaitSleepJoin state 71 } // end if 72 73 // place value in writeLocation of buffers 74 buffers[ writeLocation ] = value; 75 76 Console.Write( " {0} writes {1} ", 77 Thread.CurrentThread.Name, buffers[ writeLocation ] ); 78 79 // just produced a value, so increment number of 80 // occupied buffers 81 ++occupiedBufferCount; 82 83 // update writeLocation for future write operation, 84 // then add current state to output 85 writeLocation = ( writeLocation + 1 ) % buffers.Length; 86 Console.Write( CreateStateOutput() ); 87 88 // return waiting thread (if there is one) 89 // to Running state 90 Monitor.Pulse( this ); 91 } // end lock 92 } // end set 93 } // end property Buffer 94 95 // create state output 96 public string CreateStateOutput() 97 { 98 // display first line of state information 99 string output = "(buffers occupied: " + 100 occupiedBufferCount + ") buffers: "; 101 102 for ( int i = 0; i < buffers.Length; i++ ) 103 output += " " + string.Format( "{0,2}", buffers[ i ] ) + " "; 104 105 output += " "; 106 107 // display second line of state information 108 output += " "; 109 110 for ( int i = 0; i < buffers.Length; i++ ) 111 output += "---- "; 112 113 output += " "; 114 115 // display third line of state information 116 output += " "; 117 118 // display readLocation (R) and writeLocation (W) 119 // indicators below appropriate buffer locations 120 for ( int i = 0; i < buffers.Length; i++ ) 121 { 122 if ( i == writeLocation && 123 writeLocation == readLocation ) 124 output += " WR "; 125 else if ( i == writeLocation ) 126 output += " W "; 127 else if ( i == readLocation ) 128 output += " R "; 129 else 130 output += " "; 131 } // end for 132 133 output += " "; 134 return output; 135 } // end method CreateStateOutput 136 } // end class HoldIntegerSynchronized |
Figure 15.12. Producer and consumer threads accessing a circular buffer.
(This item is displayed on pages 750 - 753 in the print version)
1 // Fig. 15.12: CircularBufferTest.cs 2 // Implementing the producer/consumer relationship with a 3 // circular buffer. 4 using System; 5 using System.Threading; 6 7 class CircularBufferTest 8 { 9 // create producer and consumer threads and start them 10 static void Main( string[] args ) 11 { 12 // create shared object used by threads 13 CircularBuffer shared = new CircularBuffer(); 14 15 // Random object used by each thread 16 Random random = new Random(); 17 18 // display shared state before producer 19 // and consumer threads begin execution 20 Console.Write( shared.CreateStateOutput() ); 21 22 // create Producer and Consumer objects 23 Producer producer = new Producer( shared, random ); 24 Consumer consumer = new Consumer( shared, random ); 25 26 // create threads for producer and consumer and set 27 // delegates for each thread 28 Thread producerThread = 29 new Thread( new ThreadStart( producer.Produce ) ); 30 producerThread.Name = "Producer"; 31 32 Thread consumerThread = 33 new Thread( new ThreadStart( consumer.Consume ) ); 34 consumerThread.Name = "Consumer"; 35 36 // start each thread 37 producerThread.Start(); 38 consumerThread.Start(); 39 } // end Main 40 } // end class CircularBufferTest
|
The most significant changes occur in class CircularBuffer (Fig. 15.11), which now contains four instance variables. Array buffers (line 10) is a three-element integer array that represents the circular buffer. Variable occupiedBufferCount is the condition variable that can be used to determine whether a producer can write to the circular buffer (i.e., occupiedBufferCount is less than the number of elements in array buffers) and whether a consumer can read from the circular buffer (i.e., occupiedBufferCount is greater than 0). Variable readLocation (line 15) indicates the position from which the next value can be read by a consumer. Variable writeLocation (line 16) indicates the next location in which a value can be placed by a producer.
The set accessor (lines 5892) of property Buffer performs the same tasks that it did in Fig. 15.9, with a few modifications. Rather than using Monitor methods Enter and Exit to acquire and release the lock on the CircularBuffer object, we use a block of code preceded by keyword lock to lock the CircularBuffer object. As program control enters the lock block, the currently executing thread acquires the lock (assuming the lock is currently available) on the CircularBuffer object (i.e., this). When the lock block terminates, the thread releases the lock automatically.
|
|
The if statement at lines 6671 in the set accessor determines whether the producer must wait (i.e., all buffers are full). If so, lines 6869 output text indicating that the producer is waiting to perform its task, and line 70 invokes Monitor method Wait to place the producer thread in the WaitSleepJoin state. When execution continues at line 74 after the if statement, the value from the producer is placed in the circular buffer at location writeLocation. Next, lines 7677 output a message containing the value produced. Line 81 increments occupiedBufferCount, because there is now at least one value in the buffer that the consumer can read. Then, line 85 updates writeLocation for the next call to the set accessor of property Buffer. The output continues at line 86 by invoking method CreateStateOutput (declared in lines 96135), which outputs the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 90 invokes Monitor method Pulse to indicate that a thread waiting on the CircularBuffer object (if there is a waiting thread) should transition to the Running state. Note that reaching the closing right brace of the lock block at line 91 causes the thread to release the lock on the CircularBuffer object.
The get accessor (lines 2157) of property Buffer also performs the same tasks in this example that it did in Fig. 15.9, with a few minor modifications. Once again, we use a lock block to acquire and release the lock on the CircularBuffer object, rather than using Monitor methods Enter and Exit. The if statement at lines 2934 in the get accessor determines whether the consumer must wait (i.e., all buffers are empty). If the consumer thread must wait, lines 3132 indicate that the consumer is waiting to perform its task, and line 33 invokes Monitor method Wait to place the consumer thread in the WaitSleepJoin state. When execution continues at line 37 after the if statement, readValue is assigned the value at location readLocation in the circular buffer. Lines 3940 output the value consumed. Line 44 decrements the occupiedBufferCount, because the buffer now contains one more position in which the producer thread can place a value. Then, line 48 updates readLocation for the next call to the get accessor of Buffer. Line 49 invokes method CreateStateOutput to output the number of occupied buffers, the contents of the buffers and the current writeLocation and readLocation. Finally, line 53 invokes method Pulse to transition the next thread waiting for the CircularBuffer object to the Running state, and line 55 returns the consumed value to the calling method.
In Fig. 15.12, line 13 now declares shared as a CircularBuffer object, and line 20 displays the initial state of the shared buffer space. The outputs for this example include the current occupiedBufferCount, the contents of the buffers and the current writeLocation and readLocation. In the output, the letters W and R represent the current writeLocation and readLocation, respectively. Notice that after the third value is placed in the third element of the buffer, the fourth value is inserted at the beginning of the array. This provides the circular buffer effect.