Back to Vaex

Async

docs/source/guides/async.ipynb

4.19.03.7 KB
Original Source

Async programming with Vaex

Using the Rich based progress bar we can see that if we call two methods on a dataframe, we get two passes over the data (as indicated by the [1] and [2]).

python
import vaex

df = vaex.datasets.taxi()

with vaex.progress.tree('rich', title="Two passes"):
    print(df.tip_amount.sum())
    print(df.passenger_count.sum())

Using delay=True

If we pass delay=True, Vaex will not start to execute the tasks it created internally, but will return a promise instead. After calling df.execute() all tasks will execute, and the promises will be resolved, meaning that you can use the .get() method to get the final value, or use the .then() method to represent the result.

python
with vaex.progress.tree('rich', title="Single pass using delay"):
    tip_sum_promise = df.tip_amount.sum(delay=True)
    passengers_promise = df.passenger_count.sum(delay=True)
    df.execute()
    tip_per_passenger = tip_sum_promise.get() / passengers_promise.get()
    print(f"tip_per_passenger = {tip_per_passenger}")

Using the @delayed decorator

To make life easier, Vaex implements the vaex.delayed decorator. Once all arguments are resolved, the decorated function will be executed automatically.

python
with vaex.progress.tree('rich', title="Single pass using delay + using delayed"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    df.execute()
    print(f"tip_per_passenger = {tip_per_passenger_promise.get()}")

Async await

In all of the above cases, we called df.execute() which will synchronously execute all tasks using threads. However, if you are using Async IO in Python, this means you are blocking all other async coroutines from running.

To allow other coroutines to continue running (e.g. in a FastAPI context), we can instead await df.execute_async(). On top of that, we can also await the promise to get the result, instead of calling .get() to make your code look more AsyncIO like.

python
with vaex.progress.tree('rich', title="Single pass using delay + using delayed and await"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    await df.execute_async()
    tip_per_passenger = await tip_per_passenger_promise
    print(f"tip_per_passenger = {tip_per_passenger}")
<div class="alert alert-info">

Note: In the Jupyter notebook, an asyncio event loop is already running. In a script you may need to use asyncio.run(my_top_level_coroutine()) in order to use await.

</div>

Async auto execute

In the previous example we manually had to call df.execute_async(). This enables Vaex to execute all tasks in as little passes over the data as possible.

To make life easier, and your code even more AsyncIO like, we can use the df.executor.auto_execute() async context manager that will automatically call df.execute_async() for you when a promise is awaited.

python
with vaex.progress.tree('rich', title="Single pass using auto_execute"):
    async with df.executor.auto_execute():
        @vaex.delayed
        def compute(tip_sum, passengers):
            return tip_sum/passengers

        tip_per_passenger = await compute(df.tip_amount.sum(delay=True),
                                          df.passenger_count.sum(delay=True))
        print(f"tip_per_passenger = {tip_per_passenger}")