Queue
Python provides several implementations of multi-consumer, multi-producer queues. They are perhaps the best way to communicate data between threads.
The three modules provides queues in Python:
queue.Queue
: thread-safe queueasyncio.Queue
: unsafe queuemultiprocessing.Queue
: thread-safe and process-safe queue
Basic operations
Queues support put()
and get()
methods, both of which can be blocking.
All queues will block on get()
if the queue is empty:
q1 = queue.Queue()
q1.put(10)
q1.get() # returns 10
q1.get() # blocks...
Size bounded queues will block on put()
if the queue is full:
q2 = queue.Queue(2)
q2.put(10)
q2.put(3)
q3.put(8) # blocks...
The non-blocking put_nowait()
and get_nowait()
can be used instead of blocking ones.
The queue's status can be checked with q.empty()
and q.full()
, but by the time you act on that information the queue's status might have changed again.
Completion
Queues have an internal counter of items queued up. If each item dequeued is eventually paired with a q.task_done()
call, then you can wait for all tasks to be completed with q.join()
.
The example given in the queue documentation is:
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
Note the use of None
as a tombstone here, telling the workers to stop processing items.