aqueue
#
aqueue
is an async task queue with live progress display.
You put items (tasks) in, and they get processed, possibly creating more items which get processed, and so on, until all items are completed. A typical use case would be to scrape a website.
Meanwhile, a nice visualization of the queue’s goings-on is displayed in the terminal.
Note
aqueue
, or any asynchronous framework, is only going to be helpful if you’re performing
I/O-bound work.
Installation#
aqueue
is a Python package hosted on PyPI. The recommended
installation method is pip-installing into a virtual
environment:
pip install aqueue
Getting Started#
There’s two things you need to do to use aqueue:
Implement your Item subclasses.
Start your queue with one of those items.
Example#
If you had a hierarchy of items like this…
Then, you might process it with aqueue
like this…
import aqueue
class RootItem(aqueue.Item):
async def process(self) -> None:
# display what we're doing in the worker status panel
self.set_worker_desc("Processing RootItem")
# make an HTTP request, parse it, etc
...
# when you discover more items you want to process, enqueue them by yield-ing
# them:
for _ in range(3):
self.enqueue(ChildItem())
async def after_children_processed(self) -> None:
# run this method when this Item and all other Items it enqueued are done
print("All done!")
class ChildItem(aqueue.Item):
# track the enqueueing and completion of these items in the overall panel
track_overall: bool = True
async def process(self) -> None:
self.set_worker_desc("Processing ChildItem")
# this child item has no further children to enqueue, so it doesn't yield
# anything
if __name__ == "__main__":
aqueue.run_queue(
initial_items=[RootItem()],
num_workers=2,
)
Items#
Items are your units of work. They can represent whatever you’d like, such as parts of a website that you’re trying to scrape: an item for the index page, for subpages, for images, etc.
Each item must be an instance of a subclass of aqueue.Item
. Imperatively, you must implement the
aqueue.Item.process
method, which defines the work of the item, such as making an HTTP request,
parsing it, downloading something, etc.
Items can make other items to be processed later. To enqueue them, yield
them from the process
method.
As a rule of thumb, you should make a new item class whenever you notice a one-to-many relationship. For example, “this one page has many images I want to download”.
- class aqueue.Item#
The abstract class for items. Each subclass should represent one unit of work in your problem domain.
- abstract async process()#
Do this items work. This method is called when an item is popped from the queue.
This method is required to be implemented.
import aqueue class MyItem(aqueue.Item): async def process(self): self.set_worker_desc("MyItem is processing...") # do some work ... # enqueue more items self.enqueue(Childitem()) class ChildItem(aqueue.Item): async def process(self): pass
The return value for this method is aliased by aqueue.ProcessRetVal.
- Return type:
- enqueue(item)#
Enqueue another item for processing by a worker later. If this method is called outside of the process() method, it will raise a RuntimeError.
- set_worker_desc(description)#
Set the text description for the worker that is processing this item. If this method is called outside of the process() method, it will raise a RuntimeError.
This method is a no-op if aqueue is not run with run_queue/async_run_queue is run with
visualize=False
- async after_children_processed()#
Do any kind of cleanup/finalization. This method is called after this item and all the child items enqueued by this item have returned from their
process
method calls.The default implementation for this method is a no-op. Overriding it is optional.
import aqueue class MyItem(aqueue.Item): async def after_children_processed(self): print(f"{self} and all its children have been processed")
- Return type:
-
track_overall:
ClassVar
[bool
] = False# Set this class variable to True to track it in the overall progress panel.
If True, when this item is enqueued, the overall progress total will increment, and, when this item is done processing, the overall progress completed will increment.
import aqueue class MyItem(aqueue.Item): track_overall = True
-
priority:
ClassVar
[int
] = 0# Set this class variable to indicate priority.
In priority queues, this number determines the ordering of how Item objects are popped from the queue for processing. Smaller numbers have higher priority.
import aqueue class ImportantItem(aqueue.Item): priority = 1 class PettyItem(aqueue.Item): priority = 2
This attribute has no effect unless run_queue/async_run_queue is run with
order="priority"
.
-
parent:
Optional
[Item
]# The Item object that enqueued this one, or None if it was an initial item.
This attribute is only valid inside
process
or after it has been called, such as inafter_children_processed
. This attribute should not be overwritten or mutated.import aqueue class MyItem(aqueue.Item): async def process(self): parent = self.parent if not parent: print("I'm an initial item") else: print(f"I was created by {parent}")
- aqueue.ProcessRetVal = collections.abc.AsyncIterator[aqueue.queue.Item] | None#
Represent a PEP 604 union type
E.g. for int | str
Starting your queue#
Once you’ve implemented some aqueue.Item classes, start your queue to kick things off.
- aqueue.run_queue(*, initial_items, num_workers=5, order='lifo', graceful_ctrl_c=True, visualize=True, async_backend='asyncio')#
Process all items in
initial_items
(and any subsequent items they enqueue) until they are complete. Meanwhile, display a terminal visualization of it ifvisualize
is not False.import aqueue class MyItem(aqueue.Item): async def process(self): pass aqueue.run_queue( initial_items=[MyItem()], )
- Parameters:
initial_items (
Iterable
[Item
]) – An iterable that seeds the queue. This is where the top-level item(s) should go that produces more items.num_workers (
int
) –Specifies how many workers will be running concurrently.
Note
Setting a high
num_workers
does not automatically mean your program will run faster.order (
Literal
['fifo'
,'lifo'
,'priority'
]) –Can be either of:
lifo
for last-in-first-out processing, or depth-first. This is recommended for website scraping because it yields your deepest items fast, which are probably what you’re really after (versusfifo
that processes intermediate items first). This is the default.fifo
for first-in-first-out processing, or breadth-first.priority
for priority-based processing. In this case, processing will occur by ascending Item.priority (smallest first).
Note
priority
only guarantees that workers will select items from the queue according to priority. It does not guarantee that items will be completed in any order.The type for this argument is aliased by aqueue.Ordering.
graceful_ctrl_c (
bool
) –specifies whether pressing Ctrl-C will stop things abruptly (False) or wait until all workers’ current items have finished their process() methods and their after_children_processed() methods (True, the default). The latter will give you more opportunity to clean up resources, write data, etc.
Warning
If you write a buggy item that never finishes, Ctrl-C will have no effect.
visualize (
bool
|VisOptions
) –Set to True to draw a live progress meter display in the terminal while the queue is processed. Setting this to False does none of that.
Further, you may pass an aqueue.VisOptions object here, which acts just like True, but lets you configure some things about how the visualization is performed.
async_backend (
Literal
['trio'
,'asyncio'
]) –Specified the asynchronous backend framework with which to run the event loop. Generally speaking, you may not mix-and-match event loops and the async primitives run inside them. Therefore, this argument lets you choose which one to use.
The supported backends are those provided by AnyIO.
- Return type:
- async aqueue.async_run_queue(*, initial_items, num_workers=5, order='lifo', graceful_ctrl_c=True, visualize=True)#
The coroutine of aqueue.run_queue. This function is useful if you want to bring your own event loop — either asyncio or trio event loops are usable with
aqueue
(those provided by AnyIO).The parameters for both functions are the same.
import asyncio, aqueue class MyItem(aqueue.Item): async def process(self): pass asyncio.run(aqueue.async_run_queue( initial_items=[MyItem()], ))
- Return type:
- class aqueue.VisOptions(*, console=None)#
A container for settings when visualizing the queue.
Persisting state#
During development, its likely that your program will crash after doing some work. For example, maybe your HTTP request timed out or you had a bug in your HTML parsing. And, it’s a shame to lose that work that’s been done.
If you’re looking for a really handy way to persist state across runs, check out the built-in shelve module. It’s like a dict that automatically saves to a file each time you set a key in it.
import aqueue, shelve
SHELF = shelve.open('my-shelf')
class MyItem(aqueue.Item):
def __init__(self, url):
self.url = url
async def process(self):
if self.url in SHELF:
return
else:
... # do work
SHELF[self.url] = True
asyncio.run(aqueue.run_queue(
initial_items=[MyItem()],
))
SHELF.close()
Otherwise, database inserts (such as sqlite3), manual file saving, etc are are fair game.
Other cool things#
The API is fully docstringed and type-hinted 🥳
Project Information#
License: MIT
Source Code: https://github.com/t-mart/aqueue
Documentation: https://t-mart.github.io/aqueue/
Supported Python Versions: 3.10 and later