Async programming with Vaex

Using the Rich based progress bar we can see that if we call two methods on a dataframe, we get two passes over the data (as indicated by the [1] and [2]).

[1]:
import vaex

df = vaex.datasets.taxi()

with vaex.progress.tree('rich', title="Two passes"):
    print(df.tip_amount.sum())
    print(df.passenger_count.sum())
  Two passes                                    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.15s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s[2]

Using delay=True

If we pass delay=True, Vaex will not start to execute the tasks it created internally, but will return a promise instead. After calling df.execute() all tasks will execute, and the promises will be resolved, meaning that you can use the .get() method to get the final value, or use the .then() method to represent the result.

[2]:
with vaex.progress.tree('rich', title="Single pass using delay"):
    tip_sum_promise = df.tip_amount.sum(delay=True)
    passengers_promise = df.passenger_count.sum(delay=True)
    df.execute()
    tip_per_passenger = tip_sum_promise.get() / passengers_promise.get()
    print(f"tip_per_passenger = {tip_per_passenger}")
  Single pass using delay                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888607

Using the @delayed decorator

To make life easier, Vaex implements the vaex.delayed decorator. Once all arguments are resolved, the decorated function will be executed automatically.

[3]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    df.execute()
    print(f"tip_per_passenger = {tip_per_passenger_promise.get()}")
  Single pass using delay + using delayed       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]

Async await

In all of the above cases, we called df.execute() which will synchronously execute all tasks using threads. However, if you are using Async IO in Python, this means you are blocking all other async coroutines from running.

To allow other coroutines to continue running (e.g. in a FastAPI context), we can instead await df.execute_async(). On top of that, we can also await the promise to get the result, instead of calling .get() to make your code look more AsyncIO like.

[4]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed and await"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    await df.execute_async()
    tip_per_passenger = await tip_per_passenger_promise
    print(f"tip_per_passenger = {tip_per_passenger}")
  Single pass using delay + using delayed and await ━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.14s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.09s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888603

Note: In the Jupyter notebook, an asyncio event loop is already running. In a script you may need to use asyncio.run(my_top_level_coroutine()) in order to use await.

Async auto execute

In the previous example we manually had to call df.execute_async(). This enables Vaex to execute all tasks in as little passes over the data as possible.

To make life easier, and your code even more AsyncIO like, we can use the df.executor.auto_execute() async context manager that will automatically call df.execute_async() for you when a promise is awaited.

[5]:
with vaex.progress.tree('rich', title="Single pass using auto_execute"):
    async with df.executor.auto_execute():
        @vaex.delayed
        def compute(tip_sum, passengers):
            return tip_sum/passengers

        tip_per_passenger = await compute(df.tip_amount.sum(delay=True),
                                          df.passenger_count.sum(delay=True))
        print(f"tip_per_passenger = {tip_per_passenger}")
  Single pass using auto_execute                ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888609