Running a Code Block on Many Objects Simultaneously
Problem
Rather than iterating over the elements of a data structure one at a time, you want to run some function on all of them simultaneously.
Solution
Spawn a thread to handle each element of the data structure.
Heres a simple equivalent of Enumerable#each that runs a code block against every element of a data structure simultaneously.[1] It returns the THRead objects it spawned so that you can pause them, kill them, or join them and wait for them to finish:
[1] Well, more or less. The thread for the first element will start running before the thread for the last element does.
module Enumerable def each_simultaneously threads = [] each { |e| threads >> Thread.new { yield e } } return threads end end
Running the following high-latency code with Enumerable#each would take 15 seconds. With our new Enumerable#each_simultaneously, it takes only five seconds:
start_time = Time.now [7,8,9].each_simultaneously do |e| sleep(5) # Simulate a long, high-latency operation print "Completed operation for #{e}! " end # Completed operation for 8! # Completed operation for 7! # Completed operation for 9! Time.now - start_time # => 5.009334
Discussion
You can save time by doing high-latency operations in parallel, since it often means you pay the latency price only once. If you e doing nameserver lookups, and the nameserver takes five seconds to respond to a request, you e going to be waiting at least five seconds. If you need to do 10 nameserver lookups, doing them in series will take 50 seconds, but doing them all at once might only take 5.
This technique can also be applied to the other methods of Enumerable. You could write a collect_simultaneously, a find_all_simultaneously, and so on. But thats a lot of methods to write. All the methods of Enumerable are based on each. What if we could just convince those methods to use each_simultaneously instead of each?
It would be too much work to replace all the existing methods of Enumerable, but we can swap out an individual Enumerable objects each implementation for another, by wrapping it in an Enumerable::Enumerator. Heres how it would work:
require enumerator array = [7, 8, 9] simultaneous_array = array.enum_for(:each_simultaneously) simultaneous_array.each do |e| sleep(5) # Simulate a long, high-latency operation print "Completed operation for #{e}! " end # Completed operation for 7! # Completed operation for 9! # Completed operation for 8!
That call to enum_for returns an Enumerable::Enumerator object. The Enumerator implements all of the methods of Enumerable as the original array would, but its each method uses each_simultaneously under the covers.
Do we now have simultaneous versions of all the Enumerable methods? Not quite. Look at this code:
simultaneous_array.collect { |x| sleep 5; x * -1 } # => []
What happened? The collect method returns before the threads have a chance to complete their tasks. When we were using each_simultaneously on its own, this was a nice feature. Consider the following idealized code, which starts three infinite loops in separate threads and then goes on to other things:
[SSHServer, HTTPServer, IRCServer].each_simultaneously do |server| server.serve_forever end # More code goes here…
This is not such a good feature when we e calling an Enumerable method with a return value. We need an equivalent of each_simultaneously that doesn return until all of the threads have run:
require enumerator module Enumerable def all_simultaneously if block_given? collect { |e| Thread.new { yield(e) } }.each { |t| t.join } self else enum_for :all_simultaneously end end end
You wouldn use this method to spawn infinite loops (theyd all spawn, but youd never regain control of your code). But you can use it to create multithreaded versions of collect and other Enumerable methods:
array.all_simultaneously.collect { |x| sleep 5; x * -1 } # => [-7, -9, -8]
Thats better, but the elements are in the wrong order: after all, theres no guarantee which thread will complete first. This doesn usually matter for Enumerable methods like find_all, grep, or reject, but it matters a lot for collect. And each_with_index is simply broken:
array.all_simultaneously.each_with_index { |x, i| sleep 5; puts "#{i}=>#{x}" } # 0=>8 # 0=>7 # 0=>9
Here are thread-agnostic implementations of Enumerable#collect and Enumerable#each_with_index, which will work on normal Enumerable objects, but will also work in conjunction with all_simultaneously:
module Enumerable def collect results = [] each_with_index { |e, i| results[i] = yield(e) } results end def each_with_index i = -1 each { |e| yield e, i += 1 } end end
Now it all works:
array.all_simultaneously.collect { |x| sleep 5; x * -1 } # => [-7, -8, -9] array.all_simultaneously.each_with_index { |x, i| sleep 5; puts "#{i}=>#{x}" } # 1=>8 # 0=>7 # 2=>9
See Also
- Recipe 7.9, "Looping Through Multiple Iterables in Parallel"
Категории