Network Programming with Perl


 
Network Programming with Perl

By Lincoln  D.  Stein

Slots : 1

Table of Contents
Chapter  13.   Nonblocking I/O

    Content

A Generic Nonblocking I/O Module

The IO::Getline module solves the problem of mixing select() with line-oriented I/O, but it doesn't help with the other problems of nonblocking I/O, such as partial writes . In this section we develop a more general solution to the problem of nonblocking I/O. The two modules developed here are called IO::SessionSet and IO::SessionData. The IO::SessionData class is a wrapper around an IO::Socket and has read() and write() methods that use IO::Socket syntax. However, the class enhances the basic socket by knowing how to manage partial writes and the EWOULDBLOCK error message.

The IO::SessionSet class is analogous to IO::Select. It manages multiple IO::SessionData objects and allows you to multiplex among them. In addition to its IO::Select-like features, IO::SessionSet automatically calls accept() for listening sockets and adds the connected socket to the pool of managed handles. The code for these modules is regrettably complex. This is typical for applications that use nonblocking I/O because of the many exceptions and error conditions that must be handled.

Before plunging into the guts of the modules, let's look at a simple application that uses them.

A Nonblocking Echo Server

Figure 13.3 shows the code for a simple echo server that uses IO::SessionSet. This server simply echoes back all the data sent to it. [1]

[1] This server does not reverse lines, as previous echo server examples did, because it is byte stream rather than line-oriented. We discuss a line-oriented example in the next section.

Figure 13.3. An echo server that uses IO::SessionSet

Lines 1 “4: Load modules We begin by loading IO::SessionSet. IO::SessionSet loads IO::SessionData automatically.

Lines 5 “9: Create listen socket We create a listen socket in the normal way, dying in case of error.

Line 10: Create new IO::SessionSet object We create an IO::SessionSet object by calling IO::SessionSet->new() , using the listen socket as its argument. This tells IO::SessionSet to perform an automatic accept() on the socket whenever a new client tries to connect.

Lines 11 “13: Main loop The rest of the server is about a dozen lines of code. The body of the server is an infinite loop. Each time through the loop, we call the IO::SessionSet object's wait() method, which returns a list of handles that are ready for reading. It is roughly equivalent to IO::Select's can_read() method, but returns a list of IO::SessionData objects rather than raw IO::Socket objects.

wait() handles the listening socket completely internally. If an incoming connection is detected , wait() calls the listen socket's accept() method, turns the returned connected socket into a new IO::SessionData object, and adds the object to its list of monitored sockets. This new session object is now returned to the caller along with any other IO::SessionData objects that are ready for I/O.

Internally, wait() also finishes partial writes that may have occurred during previous iterations of the loop. If no sessions are ready for reading, wait() blocks indefinitely.

Lines 14 “21: Handle sessions We now loop through each of the SessionData objects returned by wait() and handle each object in turn .

For each session object, we call its read() method, which returns up to 4K bytes of data into a local variable. If read() returns a true value, we immediately send the data to the session's write() method, writing it back to the client.

If read() returns a false value, we treat it as an end of file. We close the session by calling its close() method and continue looping.

Although IO::SessionData->read() looks and acts much like IO::Socket->read() , there is a crucial difference. Whereas the IO::Socket method will return either the number of bytes read or undef on failure, IO::SessionData->read() can also return 0E0 if the call would block, in the same way as the Getline module's getline() method.

In the main loop of Figure 13.3, we first test the result code in a logical if() statement. In this context, an EWOULDBLOCK result code is treated as a true value, telling us that no error occurred. Then, before we call write() , we treat the result code as a byte count and look to see whether it is greater than . In this case, 0E0 is used in a numeric context and so evaluates to a byte count of . We skip the write and try to read from the object later.

The IO::SessionData->write() method has the same syntax as IO::Socket->write() . The method sends as much of the data as it can, and buffers whatever data is leftover from partial writes. The remainder of the queued data is written out automatically during subsequent calls to wait() .

The write() method returns the number of bytes written on success, 0E0 if the operation would block, or undef on error. Since the vast majority of I/O errors encountered during writes are unrecoverable, write() also automatically closes the IO::SessionData object and removes it from the session set when it encounters an error. (If you don't like this, you can subclass IO::SessionData and override the method that does this.) Check $! to learn which specific error occurred.

Because it's possible that there is buffered outgoing data in the session at the time we call its close() method, the effect of close() may be delayed. Subsequent calls to wait() will attempt to send the remaining queued data in the SessionData object and only close the socket when the outgoing buffer is empty. However, even if there is buffered data left, close() immediately removes the session from the IO::SessionSet so that it is never returned.

Another important difference between IO::Socket and IO::SessionData is that IO::SessionData objects are not filehandles. You cannot directly call sysread () or syswrite() using a SessionData object as the target. You must always go through the read() and write() method calls.

A Nonblocking Line-Oriented Server

IO::SessionSet cannot itself handle line-oriented reads, but a subclass named IO::LineBufferedSet provides this ability. Figure 13.4 shows yet another iteration of the Eliza psychoanalyst server, rewritten to use this class.

Figure 13.4. A psychiatrist server using IO::LineBufferedSet

Lines 1 “14: Initialize script The script begins in much the way other members of the family do. The major difference is that we import the IO::LineBufferedSet module and create a new session set using this class.

Lines 15 “17: Main loop The main loop starts with a call to the session set's wait() method. This returns a list of SessionData objects that are ready for reading. Some of them are SessionData objects that we have seen on previous iterations of the loop; others are new sessions that were created when wait() called accept() on a new incoming connection.

Lines 18 “23: Create new Chatbot objects We distinguish between new and old sessions by consulting the %SESSIONS hash.

If this is a new incoming connection, then it lacks an entry in %SESSIONS , in which case we create a fresh Chatbot::Eliza::Polite object and store it into %SESSIONS indexed by the SessionData object. We call Eliza's welcome() method to get the greeting and pass it to the SessionData object's write() method, queuing the message to be written to the client.

Lines 24 “30: Handle old sessions If %SESSIONS indicates that this is a session we have seen before, then we retrieve the corresponding Eliza object.

We read a line of input by calling the SessionData's getline() method. This method acts like the IO::Getline->getline() method that we developed earlier, returning a result code that indicates the number of bytes read and placing the data into its scalar argument.

If the number of bytes read is positive, then we got a complete line. We remove the terminal newline, pass the user input to the Eliza object's one_line() method, and hand the result off to the session object's write() method.

Line 31: Close defunct sessions If getline() returns a false value, then it indicates that the client has closed its end of the connection. We call the current session's close() method, removing it from the list of sessions monitored by the IO::LineBufferedSet object. We do the same in the case that the user terminated the session by typing " goodbye " or another exit word.

Just like IO::SessionData->read() , IO::LineBufferedSet->getline() returns in case of end of file, 0E0 if the read would block, and undef for various error conditions.

Notice that we never explicitly check for the 0E0 result code on the reads. If getline() is unsuccessful , it returns a false value ( for end of file and undef for an error). "Would block" is treated as a true value that just happens to result in a read of bytes. The easiest strategy is to do nothing in this case and just go back to waiting for IO in IO::SessionSet->wait() .

Similarly, we don't check the result code from write() , because the SessionData object handles blocked write calls by queuing the data in an internal buffer and writing it bit by bit whenever the socket can accept it.

When IO::SessionData->read() is used in the way shown in these two examples, it is unlikely that it will ever return 0E0 . This is because IO::SessionSet->wait uses select() to ensure there will be at least 1 byte to read from any SessionData object it returns. The exception to this rule occurs when the SessionData object has just been created as the result of an incoming connection. In this case, there may very well be no data to read from it immediately. This is why we skip the getline() attempt when dealing with a new session (lines 19 “23).

If you were to call the read() method several times without an intervening IO::SessionSet->wait() , the "would block" condition might very well occur. It is good practice to check that read() or getline() returns a positive byte count before trying to work with the data returned.

The IO::SessionData Module

Now that you've seen what these two modules do, we'll see how they work, starting with IO::SessionData. [2]

[2] These modules use many object-oriented tricks and other Perl idioms. If you find the code hard to follow, look at the implementation of the gab7.pl client in Chapter 16 ( Figure 16.1). Although it uses IO::Poll rather than IO::Select, this code handles the problems of nonblocking I/O using the same strategy as the more general modules presented here.

IO::SessionData is a wrapper around a single IO::Socket object. In addition to the socket, it maintains an internal buffer called the outbuffer , which holds data that has been queued for sending but has not yet been sent across the socket. Other internal data includes a pointer to the SessionSet that manages the current SessionData object, a write-only flag, and some variables that manage what happens when the outgoing buffer fills up. IO::SessionData calls the associated SessionSet to tell it when it is ready to accept new data from the remote socket and when it has outgoing data to write.

Because the outgoing data is buffered, there is a risk of the outbuffer ballooning if the remote side stops reading data for an extended period. IO::SessionData deals with this problem by defining a choke method that is called whenever the outbuffer exceeds its limit, and called again when the buffer returns to an acceptable size .

choke() is application specific. In some applications it might be appropriate to discard the extra buffered data, while in others the program might want to terminate the connection to the remote host. IO::SessionData allows the application to determine what choke() does by setting a callback subroutine that is invoked when outbuffer fills up. If no callback is set, then choke() 's default action is to flag the session so that it will no longer accept incoming data. When the write buffer returns to a reasonable size, the session is allowed to accept incoming data again. This is appropriate for many server applications in which the server reads some data from the session, processes it, and writes information back to the session.

IO::SessionData also allows you to create write-only sessions. This is designed to allow you to wrap write-only filehandles like STDOUT inside an IO::SessionData and use it in a nonblocking fashion. At the end of this chapter we give an example of how this works.

To summarize, the public API for IO::Session Data is as follows :

$bytes = $session->read($scalar,$length[$offset])

Like sysread() , except that on EWOULDBLOCK errors, it returns 0E0 .

$bytes = $session->write($scalar)

Like syswrite() , except that on EWOULDBLOCK errors, it returns 0E0 .

$bytes = $session->pending

Returns the number of unsent bytes pending in outbuffer .

$bytes = $session->write_limit([$limit])

Gets or sets the write limit, which is the maximum number of unsent bytes that can be queued in outbuffer .

$coderef = $session->set_choke([$coderef])

Gets or sets a coded reference to be invoked when outbuffer exceeds the write limit. The code will also be invoked when outbuffer returns to an allowed size.

$result = $session->close()

Closes the session, forbidding further reads. The actual filehandle will not be closed until all pending output data is written.

$fh = $session->handle()

Returns the underlying file handle.

$session_set = $session->session

Returns the associated IO::SessionSet.

Figure 13.5 gives the code for IO::SessionData.

Figure 13.5. The IO::SessionData module code

Lines 1 “7: Initialize module We begin by importing the EWOULDBLOCK constant from Errno and loading code from the IO::SessionSet module. We also define a constant default value for the maximum size of the outgoing buffer.

Lines 11 “29: The new() method The new() method constructs a new IO::SessionData object. This method is intended to be called from IO::SessionSet, not directly.

The new() method takes three arguments: the IO::SessionSet that's managing it, an IO::Handle object (typically an IO::Socket), and an optional flag that indicates whether the handle is to be treated as write-only. This last feature makes it possible to manage one-way filehandles such as STDOUT .

We put the handle into nonblocking mode by calling its blocking() method with an argument of and set up our state variables in a hash reference. This reference is now blessed with the bless () function. The effect is that the reference is turned into an object that can invoke any of our methods. When our methods are invoked, the blessed reference is returned to us as the first argument. By convention, our methods store the returned object in a variable named $self .

Unless the handle is marked write-only, we now call our internal readable() method with a true argument to tell the associated IO::SessionSet that the handle is ready for reading. The object is returned to the caller.

Lines 30 “46: The handle() , sessions() , pending() , and write_limit() methods The next part of the module consists of a series of methods that provide access to the object's internal state. The handle() method returns the stored filehandle object; the sessions() method returns the associated IO::SessionSet object; pending() returns the number of bytes that are queued to be written; and write_limit() gets or sets the size limit on the outbuffer.

The code for write_limit() may look a bit cryptic, but it is a common Perl idiom for getting or setting a state variable in a Perl object. If the method is called with no arguments, then it returns the value of the write_limit state variable. Otherwise it uses the passed argument to update the value of write_limit.

Lines 47 “51: The set_choke() method The set_choke() method retrieves or sets the callback subroutine that is invoked whenever the outgoing buffer exceeds its limit. The structure of this method is identical to write_limit() .

We expect to get a code reference as the argument, and a more careful implementation of this method would check that this is the case.

Lines 52 “60: The write() method, queuing data Now we come to the more interesting part of the module. The write() method is responsible for sending data over the handle. If part or all of the data can't be sent immediately, then it is queued in outbuffer for a later attempt.

write() can be called with just a single argument that contains data to be written, as in $session->write($data) , or called with no arguments, as in $session->write() . In the latter case, the method tries to send any queued data it has from previous attempts.

We begin by recovering the object from the subroutine stack and sanity checking that the filehandle and outbuffer are defined. If these checks pass, and if the caller asked for more data to be queued for output, we append the new data to outbuffer . Notice that outbuffer is allowed to grow as large as the data to be passed to write() . The write limit only comes into play when marking the IO::SessionData object as ready for reading or writing additional data.

Lines 61 “79: The write() method, writing data The next section of the write() method tries to do I/O. If data is pending in the outbuffer, then we call syswrite() with the handle and the contents of outbuffer and save the result code. However, before calling syswrite() , we localize $SIG{PIPE} and set it to IGNORE . This prevents the program from getting a fatal signal if the filehandle is closed prematurely. After the method exits, the PIPE handler is automatically restored to its previous state so that this adjustment does not interfere with user code.

If syswrite() returns a defined result code, then it was at least partially successful, and the result code holds the number of bytes written. We use substr() to truncate the outbuffer by the number of bytes written. This might leave outbuffer empty if all bytes were written, or might leave it containing the unwritten remainder if syswrite() reported a partial write.

Otherwise, the result code is undef , indicating an error of some sort . We check the error code stored in $! and take appropriate action.

If the error code is EWOULDBLOCK , then we return 0E0 . Otherwise, some other type of write error occurred, most likely a pipe error. We deal with this situation by deferring to an internal method named bail_out() . In the current implementation, bail_out() simply closes the handle and returns undef . To get more sophisticated behavior (such as logging or taking different actions depending on the error), create a subclass of IO::SessionData and override bail_out() .

If we happen to be called when outbuffer is empty and there is no data to queue, then we just return 0E0 . This won't ordinarily happen.

Finally, before we exit, we call an internal method named adjust_state() . This synchronizes the IO::SessionData object with the IO::SessionSet object that manages it. We finish by returning our result code.

Lines 80 “90: The read() method In contrast, the read() method is short. This method has the same syntax as Perl's built-in read() and sysread() functions. It is, in fact, a simple wrapper around sysread() that intercepts the result code and returns 0E0 on an EWOULDBLOCK error.

The only tricky feature is that we reference elements in the subroutine argument list directly (as $_[0] , $_[1] , etc.) rather than copy them into local variables. This allows us to pass these values directly to sysread() so that it can modify the caller's data buffer in place.

Lines 91 “102: The close() method The close() method is responsible for closing our filehandle and cleaning up. There's a slight twist here because of the potential for pending data in the outgoing write buffer, in which case we can't close the filehandle immediately, but only mark it so that the true close happens after all pending data is written.

We call the pending() method to determine if there is still data in the write buffer. If not, then we immediately close the filehandle and alert the IO::SessionSet that manages this session to delete the object from its list. Otherwise, we flag this session as no longer readable by calling the readable() method with a false argument (we will see more of readable() later) and set a delayed close flag named closing .

Lines 103 “116: The adjust_state() method The next method, adjust_state() , is the way the session communicates with its associated IO::SessionSet.

We begin by calling two internal methods that are named writable() and readable() , which alert the IO::SessionSet that the session is ready to write data and read data, respectively. Our first step is to examine the outgoing buffer by calling the pending() method. If there is data there, we call our writable() method with a true flag to indicate that we have data to write.

Our second step is to call the choke() method if a nonzero write_limit has been defined. We pass choke() a true flag if the write buffer limit has been exceeded. The default choke() action is to disallow further reading on us by setting readable() to false.

Finally, if the closing flag is set, we attempt to close the session by invoking the close() method. This may actually close the session, or may just result in deferring the closing if there is pending outgoing data.

Lines 117 “130: The choke() method The next method is choke() , which is called when the amount of data in the outgoing buffer exceeds write_limit or when the amount of data in the buffer has shrunk to below the limit.

We begin by looking for a callback code reference. If one is defined, we invoke it, passing it a reference to the current SessionData object and a flag indicating whether the session should be choked or released from choke.

If no callback is defined, we simply call the session's readable() method with a false flag to disallow further input on this session until the write buffer is again an acceptable length.

Lines 131 “145: The readable() and writable() methods The next two methods are readable() and writable() . They are front ends to the IO::SessionSet object's activate() method. As we will see in the next section, the first argument to activate() is the current IO::SessionData object; the second is one of the strings "read" or "write"; and the third is a flag indicating whether the indicated type of I/O should be activated or inactivated.

The only detail here is that if our session is flagged write only , then readable() does not try to activate it.

Lines 146 “157: The bail_out() method The final method in the module is bail_out() , which is called when a write error occurs. In this implementation, bail_out() drops all buffered outgoing data and closes the session. The reason for dropping pending data is so that the close will occur immediately, rather than wait indefinitely for a write that we know is likely to fail.

bail_out() receives a copy of the error code that occurred during the unsuccessful write. The current implementation of this method ignores it, but you might wish to use the error code if you subclass IO::SessionData.

That's a lot of code! But we're not finished yet. The IO::SessionData module is only half of the picture. The other half is the IO::SessionSet module, which manages a set of nonblocking sessions.

The IO::SessionSet Module

IO::SessionSet is responsible for managing a set of IO::SessionData objects. It calls select() for sessions that are ready for I/O, calls accept() on the behalf of listening sockets, and arranges to call the write() method for each session with pending outgoing data.

The API for IO::SessionSet is straightforward, as follows.

$set = IO::SessionSet->new([$listen])

Creates a new IO::SessionSet. If a listen socket is provided in $listen , then the module automatically accepts incoming connections.

$session = $set->add($handle[,$writeonly])

Adds the filehandle to the list of handles monitored by the SessionSet. If the optional $writeonly flag is true, then the handle is treated as a write-only filehandle. This is suitable for STDOUT and other output-only filehandles. add() wraps the filehandle in an IO::SessionData object and returns the object as its result.

$set->delete($handle)

Deletes the filehandle or IO::SessionData object from the monitored set.

@sessions = $set->wait([$timeout])

select() s over the set of monitored filehandles and returns the corresponding sessions that are ready for reading. Incoming connections on the listen socket, if provided, are handled automatically, as are queued writes. If $timeout is provided, wait() returns an empty list if the timeout expires before any handles are ready for reading.

@sessions = $set->sessions()

Returns all the IO::SessionData objects that have been registered with this set.

Figure 13.6 lists IO::SessionSet.

Figure 13.6. IO::SessionSet

Lines 1 “7: Initialize module We begin by bringing in the necessary modules and by defining a global variable, $DEBUG , that may be set to enable verbose debugging. This facility was invaluable to me while I was developing this module, and you may be interested in activating it to see what exactly the module is doing.

To activate debugging, simply place the statement $IO::SessionSet::DEBUG=1 at the top of your program.

Lines 8 “27: The new() constructor The new() method is the constructor for this class. We define three state variables, each of which is a key in a blessed hash. One, named sessions, holds the set of sessions. The other two, readers and writers , hold IO::Select objects that will be used to select handles for reading and writing, respectively.

If the new() method was called with a listening IO::Socket object, then we store the socket in a fourth state variable and call IO::Select's add() method to add the listen socket to the list of handles to be monitored for reading. This allows us to make calls to accept() behind the scenes.

Lines 28 “30: The sessions() method The sessions() method returns the list of IO::SessionData objects that have been registered with this module. Because this class needs to interconvert between IO::SessionData objects and the underlying handles that they wrap around, the session state variable is actually a hash in which the keys are IO::Handle objects (typically sockets) and the values are the corresponding IO::SessionData wrappers. sessions() returns the values of the hash.

Lines 31 “39: The add() method The add() method is called to add a handle to the monitored set. It takes a filehandle and an optional write-only flag.

We call IO::SessionData->new() to create a new session object, and add the handle and its newly created session object to the list of handles the IO::SessionSet monitors . We then return the session object as our function result.

This method has one subtle feature. Because we want to be able to subclass IO::SessionData in the future, add() doesn't hard code the session class name . Instead it creates the session indirectly via an internal method named SessionDataClass() . This method returns the string that will be used as the session object class, in this case "IO::SessionData." To make IO::SessionSet use a different wrapper, subclass IO::SessionSet and override (redefine) the SessionDataClass() method. We use this feature in the line-oriented version of this module discussed in the next section.

Lines 40 “52: The delete() method Next comes the delete() method, which removes a session from the list of monitored objects. In the interests of flexibility, this method accepts either an IO::SessionData object to delete or an IO::Handle. We call two internal methods, to_handle() and to_session() , to convert our argument into a handle or a session, respectively. We then remove all references to the handle and session from our internal data structures.

Lines 53 “61: The to_handle() method The to_handle() method accepts either an IO::SessionData object or an IO::Handle object. To distinguish these possibilities, we use Perl's built-in isa() method to determine whether the argument is a subclass of IO::SessionData. If this returns true, we call the object's handle() method to fetch its underlying filehandle and return it.

If isa() returns false, we test whether the argument is a filehandle by testing the return value of fileno() , and if so, return the argument unmodified. If neither test succeeds, we throw up our hands in despair and return undef .

Lines 62 “70: The to_session() method The to_session() method performs the inverse function. We check to see whether the argument is an IO::Session, and if so, return it unchanged. Otherwise, we test the argument with fileno() , and if it looks like a filehandle, we use it to index into our sessions hash, fetching the IO::Session object that corresponds to the handle.

Lines 71 “92: The activate() method The activate() method is responsible for adding a handle to the appropriate IO::Select object when the handle's corresponding IO::SessionData object indicates that it wants to do I/O. The method can also be used to deactivate an active handle.

Our first argument is either an IO::SessionData object or a filehandle, so we begin with a call to to_handle() to turn the argument ”whatever it is ”into a filehandle. Our second argument is either of the strings "read" or "write." If it's "read," we operate on the readers IO::Select object. Otherwise, we operate on the writers object. The appropriate IO::Select object gets copied into a local variable.

Depending on whether the caller wants to activate or inactivate the handle, we either add or delete the filehandle to the IO::Select set. In either case, we return the previous activation setting for the filehandle.

Lines 93 “110: The wait() method: handle pending writes Finally we get to the guts of the module, the wait() method. Our job is to call IO::Select->select() for the handles whose sessions have declared them ready for I/O, to call write() for those sessions that have queued outgoing data, and to call accept() on the listening handle if the IO::Select object indicates that it is ready for reading. Any other filehandles that are ready for reading are used to look up the corresponding IO::SessionData objects and returned to the caller.

The first part of this subroutine calls IO::Select->select() , returning a two-element list of readers and writers that are ready for I/O. Our next task is to handle the writers with queued data. We now loop through each of the writable handles, finding its corresponding session and calling the session object's write() method to syswrite() as much pending data as it can. The IO::SessionData->write() method, as you recall, will remove itself from the list of writable handles when its outgoing buffer is empty.

Lines 111 “127: The wait() method: handle pending reads The next part of wait() deals with each of the readable filehandles returned by IO::Select->select() . If one of the readable filehandles is the listen socket, we call its accept() method to get a new connected socket and add this socket to our session set by invoking the add() method. The resulting IO::SessionData object is added to the list of readable sessions that we return to the caller.

If, on the other hand, the readable handle corresponds to any of the other handles, we look up its corresponding session and add it to the list of sessions to be returned to the caller.

Lines 128 “132: The SessionDataClass() method The last method is SessionDataClass() , which returns the name of the SessionData class that the add() method will create when it adds a filehandle to the session set. In this module, SessionDataClass() returns the string "IO::SessionData."

There's a small but subtle semantic inconsistency in IO::SessionSet->wait() . The new session that is created when an incoming connection comes in is returned to the caller regardless of whether it actually has data to read. This gives the caller a chance to write outgoing data to the handle ”for example, to print a welcome banner when the client connects.

If the caller invokes the new session object's read() method, it may have nothing to return. However, because the socket is nonblocking, this doesn't pose a practical problem. The read() method will return 0E0 , and the caller should ignore the read and try again later.

The IO::LineBufferedSet and IO::LineBufferedSessionData Classes

With some additional effort we can subclass the IO::SessionSet and IO::SessionData classes to make them handle line-oriented I/O, creating the IO::LineBufferedSet and IO::LineBufferedSessionData classes. IO::LineBuffered Set is backwards compatible with IO::SessionSet. You can treat the session objects it returns in a byte stream “oriented way, calling read() to retrieve arbitrary chunks of data. However, you can also use it in a line-oriented way, calling getline() to read data one line at a time.

IO::LineBufferedSet implements the following modified methods:

$set = IO::LineBufferedSet->new([$listen])

Creates a new IO::LineBufferedSet object. As in IO::SessionSet->new() , optional listen socket will be monitored for incoming connections.

@sessions = $set->wait([$timeout])

As in IO::SessionSet->wait() , select() accesses the monitored filehandles and returns those sessions that are ready for reading. However, the returned sessions are IO::LineBufferedSessionData objects that support line-oriented I/O.

IO::LineBufferedSessionData provides all the methods of IO::SessionData, plus one:

$bytes = $session->getline($data)

Reads a line of data from the associated filehandle, placing it in $data and returning the length of the line. On end of file, it returns . On EWOULDBLOCK , it returns 0E0 . On other I/O errors, it returns undef .

The code for these modules is essentially an elaboration of the simpler IO::Getline module that we discussed earlier in this chapter. Because it doesn't add much to what we have already learned, we won't walk through the code in detail. Appendix A shows the full code listing for these two modules.

As IO::Getline did, IO::LineBufferedSessionData uses a strategy of maintaining an internal buffer of data to hold partial lines. When its getline() method is called, we look here first for a full line of text. If one is found, then getline() returns it. Otherwise, getline() calls sysread() to add data to the end of the buffer and tries again.

However, maintaining this internal buffer leads to the same problem that standard I/O has when used in conjunction with select() . The select() call may indicate that there is no new data to read from a handle when in fact there is a full line of text saved in the buffer. This means that we must modify our select() strategy slightly. This is done by IO::LineBufferedSet, a subclass of IO::SessionSet modified to work correctly with IO::LineBufferedSessionData. IO::LineBufferedSet overrides its parent's wait() method to look like this:

sub wait { my $self = shift; # look for old buffered data first my @sessions = grep {$_->has_buffered_data} $self->sessions; return @sessions if @sessions; return $self->SUPER::wait(@_); }

The wait() method calls sessions() to return the list of session objects being monitored. It now filters this list by calling a new has_buffered_data() method, which returns true if the getline() method's internal data buffer contains one or more complete lines to read.

If there are sessions with whole lines to read, wait() returns them immediately. Otherwise, it falls back to the inherited version of wait() (by invoking its superclass's method, SUPER::wait() ), which checks select() to see if any of the low-level filehandles has new data to read.

Using IO::SessionSet with Nonsocket Handles

To finish this section, we'll look at one last application of the IO::SessionSet module, a nonblocking version of the gab client. This works like the clients of the previous chapter but uses no forking or threading tricks to interweave input and output.

This client illustrates how to deal with handles that are unidirectional, like STDIN and STDOUT , and how to use the choke() callback to keep the size of the internal write buffer from growing without limit. The code is shown in Figure 13.7.

Figure 13.7. The gab6.pl script

Lines 1 “8: Initialize script and process the command-line arguments We begin by bringing in the appropriate modules. To see status messages from IO::SessionSet as it manages the flow of data, try setting $IO::SessionSet::DEBUG to a true value.

Lines 9 “13: Create IO::Socket and IO::SessionSet objects We create an IO::Socket::INET object connected to the indicated host and port, and invoke IO::SessionSet->new() to create a new SessionSet object. Unlike the previous examples, there's no listening socket for IO::SessionSet to monitor, so we don't pass any arguments to new() .

We now add the connected socket to the session set by calling its add() method and do the same for the STDIN and STDOUT filehandles. Each of these calls returns an IO::SessionData object, which we store for later use.

When we add STDOUT to the session set, we use a true second argument, indicating that STDOUT is write only. This prevents the session set object from placing STDOUT on the list of handles it monitors for reading.

Lines 14 “21: Set up choke() callbacks The next two statements set up customized callbacks for the choke() method. The first call to set_choke() installs a callback that disables reading from the socket when the STDOUT buffer is full. The second call installs a callback that disables reading from STDIN when the socket's output buffer is full. This behavior is more appropriate than IO::SessionSet's default, which works best when reading and writing to the same filehandle.

The callbacks themselves are anonymous subroutines. Each one is called by choke() with two arguments consisting of the current IO::SessionSet object and a flag indicating whether the session should be choked or unchoked.

Lines 22 “24: Begin main loop We enter the main I/O loop. In contrast with previous iterations of the gab client, we cannot quit immediately when we receive an EOF condition when reading from the connected socket. This is because we might still have queued data sitting in the socket or the STDOUT session waiting for the filehandle to be ready for writing.

Instead we quit only after all queued data bound for STDOUT and the socket has cleared and IO::SessionSet has removed them from the monitored set. We determine this by calling $set->sessions . If this returns undef , then all queued data has been dealt with and the corresponding sessions have been removed from the SessionSet.

Line 25: Invoke wait() We invoke $set->wait() in order to wait for sessions to become ready for reading. This also handles pending writes. When wait() returns, we store the sessions that are ready for reading in an array.

Lines 26 “36: Do I/O on sessions We loop over all the sessions that are ready for reading. If the socket is among them, we read some data from it and write to standard output. If we get an EOF during the read, we close() the socket, as well as the standard input and standard output filehandles. This flags the module that we will perform no further I/O on any of these objects. However, the underlying filehandles will not be closed until subsequent calls to wait() have transmitted all queued data.

Notice that the idiom for writing to $stdout is:

$stdout->write($data) if $bytes > 0;

This is because $connection->read() may return 0E0 to indicate an EWOULDBLOCK error. In this case, $data won't contain new data and we shouldn't bother to write it. The numeric comparison handles this situation.

Lines 37 “43: Copy from standard input to the socket If the ready handle returned by wait() is the $stdin IO::SessionData object, then we attempt to read some data from it and write the retrieved data to the socket.

If read() returns a false result, however, this indicates that STDIN has been closed. We proceed by calling the socket's shutdown() method to close the write side of the connection. This causes the remote server to see an end-of-file condition and shut down its side of the socket, causing $connection->read() to return a false result on a subsequent iteration of the loop. This is a similar strategy to previous versions of this client.

This version of gab is 45 lines long, compared with 28 lines for the forking version of Figure 10.3 and 27 lines for the multithreaded version of Figure 11.3. This might not seem to be a large increase in complexity, but it is supported by another 300 lines of code in the IO::SessionData and IO::SessionSet modules! This increase in size and complexity is typical of what happens when moving from a blocking, multithreaded, or multitasking architecture to a nonblocking single-threaded design.


   
Top

Категории