aqueue#

aqueue is an async task queue with live progress display.

You put items (tasks) in, and they get processed, possibly creating more items which get processed, and so on, until all items are completed. A typical use case would be to scrape a website.

Meanwhile, a nice visualization of the queue’s goings-on is displayed in the terminal.

Demonstration of aqueue

Note

aqueue, or any asynchronous framework, is only going to be helpful if you’re performing I/O-bound work.

Installation#

aqueue is a Python package hosted on PyPI. The recommended installation method is pip-installing into a virtual environment:

pip install aqueue

Getting Started#

There’s two things you need to do to use aqueue:

  1. Implement your Item subclasses.

  2. Start your queue with one of those items.

Example#

If you had a hierarchy of items like this…

Simple item hierarchy with one root item and many children items stemming from it.

Then, you might process it with aqueue like this…

import aqueue


class RootItem(aqueue.Item):
    async def process(self) -> None:
        # display what we're doing in the worker status panel
        self.set_worker_desc("Processing RootItem")

        # make an HTTP request, parse it, etc
        ...

        # when you discover more items you want to process, enqueue them by yield-ing
        # them:
        for _ in range(3):
            self.enqueue(ChildItem())

    async def after_children_processed(self) -> None:
        # run this method when this Item and all other Items it enqueued are done
        print("All done!")


class ChildItem(aqueue.Item):

    # track the enqueueing and completion of these items in the overall panel
    track_overall: bool = True

    async def process(self) -> None:
        self.set_worker_desc("Processing ChildItem")
        # this child item has no further children to enqueue, so it doesn't yield
        # anything


if __name__ == "__main__":
    aqueue.run_queue(
        initial_items=[RootItem()],
        num_workers=2,
    )

Items#

Items are your units of work. They can represent whatever you’d like, such as parts of a website that you’re trying to scrape: an item for the index page, for subpages, for images, etc.

Each item must be an instance of a subclass of aqueue.Item. Imperatively, you must implement the aqueue.Item.process method, which defines the work of the item, such as making an HTTP request, parsing it, downloading something, etc.

Items can make other items to be processed later. To enqueue them, yield them from the process method.

As a rule of thumb, you should make a new item class whenever you notice a one-to-many relationship. For example, “this one page has many images I want to download”.

class aqueue.Item#

The abstract class for items. Each subclass should represent one unit of work in your problem domain.

abstract async process()#

Do this items work. This method is called when an item is popped from the queue.

This method is required to be implemented.

import aqueue

class MyItem(aqueue.Item):
    async def process(self):
        self.set_worker_desc("MyItem is processing...")

        # do some work
        ...

        # enqueue more items
        self.enqueue(Childitem())

class ChildItem(aqueue.Item):
    async def process(self):
        pass

The return value for this method is aliased by aqueue.ProcessRetVal.

Return type:

None

enqueue(item)#

Enqueue another item for processing by a worker later. If this method is called outside of the process() method, it will raise a RuntimeError.

Parameters:

item (Item) – The item to enqueue.

Return type:

None

set_worker_desc(description)#

Set the text description for the worker that is processing this item. If this method is called outside of the process() method, it will raise a RuntimeError.

This method is a no-op if aqueue is not run with run_queue/async_run_queue is run with visualize=False

Parameters:

description (str) – The text that the worker display will show.

Return type:

None

async after_children_processed()#

Do any kind of cleanup/finalization. This method is called after this item and all the child items enqueued by this item have returned from their process method calls.

The default implementation for this method is a no-op. Overriding it is optional.

import aqueue

class MyItem(aqueue.Item):
    async def after_children_processed(self):
        print(f"{self} and all its children have been processed")
Return type:

None

track_overall: ClassVar[bool] = False#

Set this class variable to True to track it in the overall progress panel.

If True, when this item is enqueued, the overall progress total will increment, and, when this item is done processing, the overall progress completed will increment.

import aqueue

class MyItem(aqueue.Item):
    track_overall = True
priority: ClassVar[int] = 0#

Set this class variable to indicate priority.

In priority queues, this number determines the ordering of how Item objects are popped from the queue for processing. Smaller numbers have higher priority.

import aqueue

class ImportantItem(aqueue.Item):
    priority = 1

class PettyItem(aqueue.Item):
    priority = 2

This attribute has no effect unless run_queue/async_run_queue is run with order="priority".

parent: Optional[Item]#

The Item object that enqueued this one, or None if it was an initial item.

This attribute is only valid inside process or after it has been called, such as in after_children_processed. This attribute should not be overwritten or mutated.

import aqueue

class MyItem(aqueue.Item):
    async def process(self):
        parent = self.parent
        if not parent:
            print("I'm an initial item")
        else:
            print(f"I was created by {parent}")
aqueue.ProcessRetVal = collections.abc.AsyncIterator[aqueue.queue.Item] | None#

Represent a PEP 604 union type

E.g. for int | str

Starting your queue#

Once you’ve implemented some aqueue.Item classes, start your queue to kick things off.

aqueue.run_queue(*, initial_items, num_workers=5, order='lifo', graceful_ctrl_c=True, visualize=True, async_backend='asyncio')#

Process all items in initial_items (and any subsequent items they enqueue) until they are complete. Meanwhile, display a terminal visualization of it if visualize is not False.

import aqueue

class MyItem(aqueue.Item):
    async def process(self):
        pass

aqueue.run_queue(
    initial_items=[MyItem()],
)
Parameters:
  • initial_items (Iterable[Item]) – An iterable that seeds the queue. This is where the top-level item(s) should go that produces more items.

  • num_workers (int) –

    Specifies how many workers will be running concurrently.

    Note

    Setting a high num_workers does not automatically mean your program will run faster.

  • order (Literal['fifo', 'lifo', 'priority']) –

    Can be either of:

    • lifo for last-in-first-out processing, or depth-first. This is recommended for website scraping because it yields your deepest items fast, which are probably what you’re really after (versus fifo that processes intermediate items first). This is the default.

    • fifo for first-in-first-out processing, or breadth-first.

    • priority for priority-based processing. In this case, processing will occur by ascending Item.priority (smallest first).

    Note

    priority only guarantees that workers will select items from the queue according to priority. It does not guarantee that items will be completed in any order.

    The type for this argument is aliased by aqueue.Ordering.

  • graceful_ctrl_c (bool) –

    specifies whether pressing Ctrl-C will stop things abruptly (False) or wait until all workers’ current items have finished their process() methods and their after_children_processed() methods (True, the default). The latter will give you more opportunity to clean up resources, write data, etc.

    Warning

    If you write a buggy item that never finishes, Ctrl-C will have no effect.

  • visualize (bool | VisOptions) –

    Set to True to draw a live progress meter display in the terminal while the queue is processed. Setting this to False does none of that.

    Further, you may pass an aqueue.VisOptions object here, which acts just like True, but lets you configure some things about how the visualization is performed.

  • async_backend (Literal['trio', 'asyncio']) –

    Specified the asynchronous backend framework with which to run the event loop. Generally speaking, you may not mix-and-match event loops and the async primitives run inside them. Therefore, this argument lets you choose which one to use.

    The supported backends are those provided by AnyIO.

Return type:

None

async aqueue.async_run_queue(*, initial_items, num_workers=5, order='lifo', graceful_ctrl_c=True, visualize=True)#

The coroutine of aqueue.run_queue. This function is useful if you want to bring your own event loop — either asyncio or trio event loops are usable with aqueue (those provided by AnyIO).

The parameters for both functions are the same.

import asyncio, aqueue

class MyItem(aqueue.Item):
    async def process(self):
        pass

asyncio.run(aqueue.async_run_queue(
    initial_items=[MyItem()],
))
Return type:

None

aqueue.Ordering#

alias of Literal[‘fifo’, ‘lifo’, ‘priority’]

class aqueue.VisOptions(*, console=None)#

A container for settings when visualizing the queue.

console: Optional[Console]#

An explicit rich.console.Console to print the visualization. This should be provided if your are using a Console elsewhere in your code because it will conflict with aqueue’s internal one.

Setting this to None will use the console returned by rich.get_console.

Sharing state#

Often, its beneficial to share state between the items. Using the website scrape example again, you may want to keep track of the URLs you’ve visited so you don’t scrape them twice.

If this is needed, simply keep an object at the global or class level and store a key for the item. For example, a URL string may be a good key.

import aqueue

class MyItem(aqueue.Item):

   visited_urls = set()

   def __init__(self, url):
      self.url = url

   async def process(self):
         if self.url in self.visited_urls:
            return
         else:
            ... # do work
            self.visited_urls.add(self.url)

aqueue.run_queue(
   initial_items=[MyItem()],
)

If you don’t want to or can’t use a global variable, consider a contextvars.ContextVar.

Persisting state#

During development, its likely that your program will crash after doing some work. For example, maybe your HTTP request timed out or you had a bug in your HTML parsing. And, it’s a shame to lose that work that’s been done.

If you’re looking for a really handy way to persist state across runs, check out the built-in shelve module. It’s like a dict that automatically saves to a file each time you set a key in it.

import aqueue, shelve

SHELF = shelve.open('my-shelf')

class MyItem(aqueue.Item):
   def __init__(self, url):
      self.url = url

   async def process(self):
         if self.url in SHELF:
            return
         else:
            ... # do work
            SHELF[self.url] = True

asyncio.run(aqueue.run_queue(
   initial_items=[MyItem()],
))
SHELF.close()

Otherwise, database inserts (such as sqlite3), manual file saving, etc are are fair game.

Other cool things#

The API is fully docstringed and type-hinted 🥳

Project Information#