Implementing a Distributed Queue
Credit: James Edward Gray II
Problem
You want to use a central server as a workhorse, queueing up requests from remote clients and handling them one at a time.
Solution
Heres a method that shares a Queue object with clients. Clients put job objects into the queue, and the server handles them by yielding them to a code block. #!/usr/bin/ruby
#!/usr/bin/ruby # queue_server.rb require hread # For Rubys thread-safe Queue require drb $SAFE = 1 # Minimum acceptable paranoia level when sharing code! def run_queue(url=druby://127.0.0.1:61676) queue = Queue.new # Containing the jobs to be processed # Start up DRb with URI and object to share DRb.start_service(url, queue) puts Listening for connection… while job = queue.deq yield job end end
Have your server call run_queue, passing in a code block that handles a single job. Every time one of your clients puts a job into the server queue, the server passes the job into the code block. Heres a sample code block that can handle a fast-running job ("Report") or a slow-running job ("Process"):
run_queue do |job| case job[ equest] when Report puts "Reporting for #{job[from]}… Done." when Process puts "Processing for #{job[from]}…" sleep 3 # Simulate real work puts Processing complete. end end
If we get a couple of clients sending in requests, output might look like this:
$ ruby queue_server.rb Listening for connection… Processing for Client 1… Processing complete. Processing for Client 2… Processing complete. Reporting for Client 1… Done. Reporting for Client 2… Done. Processing for Client 1… Processing complete. Reporting for Client 2… Done. …
Discussion
A client for the queue server defined in the Solution simply needs to connect to the DRB server and add a mix of "Report" and "Process" jobs to the queue. Heres a client that connects to the DRb server and adds 20 jobs to the queue at random:
#!/usr/bin/ruby # queue_client.rb require hread require drb # Get a unique name for this client NAME = ARGV.shift or raise "Usage: #{File.basename($0)} CLIENT_NAME" DRb.start_service queue = DRbObject.new_with_uri("druby://127.0.0.1:61676") 20.times do queue.enq( equest => [Report, Process][rand(2)], from => NAME) sleep 1 # simulating network delays end
Everything from Recipe 16.10 applies here. The major difference is that Ruby ships with a thread-safe Queue. That saves us the trouble of building our own.
See Also
- Recipe 16.10
Категории