If you have a situation where you’re trying to process in parallel many tasks with Python and
multiprocessing.imap
may be good options. If, in addition, you don’t care about the order they get processed and returned in, you can use imap_unordered
For a simple demonstration, let’s pretend we have 20 tasks, each taking somewhere between 1 and 4 seconds to complete.
import multiprocessing as mp
import random
import time
def process(tup):
idx, sec = tup
time.sleep(sec)
print(f"Worker {idx} done in {sec} seconds")
return idx
def test_imap_unordered():
tasks = [(i, random.randint(1, 4)) for i in range(20)]
with mp.Pool(8) as pool:
results = pool.imap_unordered(process, tasks)
for r in results:
print(f"Done with worker {r}")
if __name__ == "__main__":
test_imap_unordered()
A benefit of these asynchronous iterators comes if the inputs to your processing function are large and you don’t want to hold all of them in memory (as would occur if you used multiprocessing.map
, which converts your input iterator to a list
).
However, a subtle issue can occur if you change the indentation above to
with mp.Pool(8) as pool:
results = pool.imap_unordered(process, tasks)
for r in results:
print(f"Done with worker {r}")
This looks inoccuous enough, especially if you’re used to using pool.map(...)
. However, the results
object is an IMapIterator
which lazily works through the outputs as they come in. When an output is finished and consumed, it signals to the pool to process the next input.
The problem: When we exit the context manager block, the inputs stop getting processed. This causes a deadlock: Nothing will actually get processed in the version where you dedent the for r in results
.
This subtle issue was pointed out to me by Claude 3 Opus. I’m recording this question and answer because this feels like another noticeably jump in performance of LLMs: Two very smart and experienced developers were looking at the code and didn’t see what the issue was. While I’m sure they could have eventually figured it out, I solved the problem in < 5 minutes by asking the question with the llm
command line tool (reformatting for clarity):
$ llm chat -m claude-3-opus
Chatting with claude-3-opus-20240229
Type 'exit' or 'quit' to exit
Type '!multi' to enter multiple lines, then '!end' to finish
> !multi
I have some python code where the multiprocessing is hanging ONLY with imap_unordered:
# Create a pool and dispatch
with Pool(processes=worker_count, maxtasksperchild=1) as p:
# mp_tasks = p.map(wrap_solve_mcf, uw_inputs(range(nruns))) # runs fine!
mp_tasks = p.imap_unordered(wrap_solve_mcf, uw_inputs(range(nruns))) # FAILS!
# runs fine!
# p = Pool(processes=worker_count, maxtasksperchild=1)
# mp_tasks = p.imap_unordered(wrap_solve_mcf, uw_inputs(range(nruns)))
# p.close()
Why is that? what is different?
The answer from Claude was shockingly coherent and helpful (reproduced with formatting):
The difference between map
and imap_unordered
is in how they return the results and how the main process interacts with the worker processes.
map
:
map
blocks until all the tasks are completed and returns the results as a list in the same order as the input iterable.imap_unordered
:
imap_unordered
returns an iterator that yields the results as soon as they are available, in an arbitrary order.The issue with your code using imap_unordered
is likely related to the fact that you are using a with
statement to manage the pool. When the with
block ends, the pool is automatically closed and joined, which means the main process waits for all the worker processes to finish.
However, with imap_unordered
, the main process needs to actively consume the results from the iterator returned by imap_unordered
. If you don’t consume the results, the worker processes may hang waiting for the main process to retrieve the results.
To fix the issue, you can consume the results from the iterator returned by imap_unordered
. Here’s an example:
with Pool(processes=worker_count, maxtasksperchild=1) as p:
mp_tasks = p.imap_unordered(wrap_solve_mcf, uw_inputs(range(nruns)))
results = list(mp_tasks) # Consume the results from the iterator
By converting the iterator returned by imap_unordered
to a list using list(mp_tasks)
, you actively consume the results, allowing the worker processes to complete and the pool to be properly closed and joined.
Alternatively, you can use the commented-out code that you mentioned, where you manually manage the pool without using a with
statement:
p = Pool(processes=worker_count, maxtasksperchild=1)
mp_tasks = p.imap_unordered(wrap_solve_mcf, uw_inputs(range(nruns)))
results = list(mp_tasks) # Consume the results from the iterator
p.close()
In this case, you explicitly close the pool after consuming the results from the iterator.
Remember to handle the results appropriately based on your requirements, whether you need them in a specific order or not.