add files
This commit is contained in:
200
python-3.7.4-docs-html/_sources/library/asyncio-queue.rst.txt
Normal file
200
python-3.7.4-docs-html/_sources/library/asyncio-queue.rst.txt
Normal file
@@ -0,0 +1,200 @@
|
||||
.. currentmodule:: asyncio
|
||||
|
||||
.. _asyncio-queues:
|
||||
|
||||
======
|
||||
Queues
|
||||
======
|
||||
|
||||
asyncio queues are designed to be similar to classes of the
|
||||
:mod:`queue` module. Although asyncio queues are not thread-safe,
|
||||
they are designed to be used specifically in async/await code.
|
||||
|
||||
Note that methods of asyncio queues don't have a *timeout* parameter;
|
||||
use :func:`asyncio.wait_for` function to do queue operations with a
|
||||
timeout.
|
||||
|
||||
See also the `Examples`_ section below.
|
||||
|
||||
Queue
|
||||
=====
|
||||
|
||||
.. class:: Queue(maxsize=0, \*, loop=None)
|
||||
|
||||
A first in, first out (FIFO) queue.
|
||||
|
||||
If *maxsize* is less than or equal to zero, the queue size is
|
||||
infinite. If it is an integer greater than ``0``, then
|
||||
``await put()`` blocks when the queue reaches *maxsize*
|
||||
until an item is removed by :meth:`get`.
|
||||
|
||||
Unlike the standard library threading :mod:`queue`, the size of
|
||||
the queue is always known and can be returned by calling the
|
||||
:meth:`qsize` method.
|
||||
|
||||
This class is :ref:`not thread safe <asyncio-multithreading>`.
|
||||
|
||||
.. attribute:: maxsize
|
||||
|
||||
Number of items allowed in the queue.
|
||||
|
||||
.. method:: empty()
|
||||
|
||||
Return ``True`` if the queue is empty, ``False`` otherwise.
|
||||
|
||||
.. method:: full()
|
||||
|
||||
Return ``True`` if there are :attr:`maxsize` items in the queue.
|
||||
|
||||
If the queue was initialized with ``maxsize=0`` (the default),
|
||||
then :meth:`full()` never returns ``True``.
|
||||
|
||||
.. coroutinemethod:: get()
|
||||
|
||||
Remove and return an item from the queue. If queue is empty,
|
||||
wait until an item is available.
|
||||
|
||||
.. method:: get_nowait()
|
||||
|
||||
Return an item if one is immediately available, else raise
|
||||
:exc:`QueueEmpty`.
|
||||
|
||||
.. coroutinemethod:: join()
|
||||
|
||||
Block until all items in the queue have been received and processed.
|
||||
|
||||
The count of unfinished tasks goes up whenever an item is added
|
||||
to the queue. The count goes down whenever a consumer coroutine calls
|
||||
:meth:`task_done` to indicate that the item was retrieved and all
|
||||
work on it is complete. When the count of unfinished tasks drops
|
||||
to zero, :meth:`join` unblocks.
|
||||
|
||||
.. coroutinemethod:: put(item)
|
||||
|
||||
Put an item into the queue. If the queue is full, wait until a
|
||||
free slot is available before adding the item.
|
||||
|
||||
.. method:: put_nowait(item)
|
||||
|
||||
Put an item into the queue without blocking.
|
||||
|
||||
If no free slot is immediately available, raise :exc:`QueueFull`.
|
||||
|
||||
.. method:: qsize()
|
||||
|
||||
Return the number of items in the queue.
|
||||
|
||||
.. method:: task_done()
|
||||
|
||||
Indicate that a formerly enqueued task is complete.
|
||||
|
||||
Used by queue consumers. For each :meth:`~Queue.get` used to
|
||||
fetch a task, a subsequent call to :meth:`task_done` tells the
|
||||
queue that the processing on the task is complete.
|
||||
|
||||
If a :meth:`join` is currently blocking, it will resume when all
|
||||
items have been processed (meaning that a :meth:`task_done`
|
||||
call was received for every item that had been :meth:`~Queue.put`
|
||||
into the queue).
|
||||
|
||||
Raises :exc:`ValueError` if called more times than there were
|
||||
items placed in the queue.
|
||||
|
||||
|
||||
Priority Queue
|
||||
==============
|
||||
|
||||
.. class:: PriorityQueue
|
||||
|
||||
A variant of :class:`Queue`; retrieves entries in priority order
|
||||
(lowest first).
|
||||
|
||||
Entries are typically tuples of the form
|
||||
``(priority_number, data)``.
|
||||
|
||||
|
||||
LIFO Queue
|
||||
==========
|
||||
|
||||
.. class:: LifoQueue
|
||||
|
||||
A variant of :class:`Queue` that retrieves most recently added
|
||||
entries first (last in, first out).
|
||||
|
||||
|
||||
Exceptions
|
||||
==========
|
||||
|
||||
.. exception:: QueueEmpty
|
||||
|
||||
This exception is raised when the :meth:`~Queue.get_nowait` method
|
||||
is called on an empty queue.
|
||||
|
||||
|
||||
.. exception:: QueueFull
|
||||
|
||||
Exception raised when the :meth:`~Queue.put_nowait` method is called
|
||||
on a queue that has reached its *maxsize*.
|
||||
|
||||
|
||||
Examples
|
||||
========
|
||||
|
||||
.. _asyncio_example_queue_dist:
|
||||
|
||||
Queues can be used to distribute workload between several
|
||||
concurrent tasks::
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
|
||||
|
||||
async def worker(name, queue):
|
||||
while True:
|
||||
# Get a "work item" out of the queue.
|
||||
sleep_for = await queue.get()
|
||||
|
||||
# Sleep for the "sleep_for" seconds.
|
||||
await asyncio.sleep(sleep_for)
|
||||
|
||||
# Notify the queue that the "work item" has been processed.
|
||||
queue.task_done()
|
||||
|
||||
print(f'{name} has slept for {sleep_for:.2f} seconds')
|
||||
|
||||
|
||||
async def main():
|
||||
# Create a queue that we will use to store our "workload".
|
||||
queue = asyncio.Queue()
|
||||
|
||||
# Generate random timings and put them into the queue.
|
||||
total_sleep_time = 0
|
||||
for _ in range(20):
|
||||
sleep_for = random.uniform(0.05, 1.0)
|
||||
total_sleep_time += sleep_for
|
||||
queue.put_nowait(sleep_for)
|
||||
|
||||
# Create three worker tasks to process the queue concurrently.
|
||||
tasks = []
|
||||
for i in range(3):
|
||||
task = asyncio.create_task(worker(f'worker-{i}', queue))
|
||||
tasks.append(task)
|
||||
|
||||
# Wait until the queue is fully processed.
|
||||
started_at = time.monotonic()
|
||||
await queue.join()
|
||||
total_slept_for = time.monotonic() - started_at
|
||||
|
||||
# Cancel our worker tasks.
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
# Wait until all worker tasks are cancelled.
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
print('====')
|
||||
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
|
||||
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
|
||||
|
||||
|
||||
asyncio.run(main())
|
Reference in New Issue
Block a user