Use xpersist with Prefect#

Let’s begin by importing necessary packages and enabling checkpointing.

import os

os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"

import tempfile
import time
from xpersist import XpersistResult, CacheStore
from prefect import Flow, task
import prefect
import xarray as xr

Ensure that checkpointing is enabled:

prefect.context.to_dict()['config']['flows']['checkpointing']
True

Set Cache Location#

store = CacheStore(f'{tempfile.gettempdir()}/my-cache')
store
CacheStore(path='/tmp/my-cache', readonly=False, on_duplicate_key=<DuplicateKeyEnum.skip: 'skip'>, storage_options={})

Set Prefect Flow#

To enable persisting output of a task in xpersist’s cache store, we need to define a xpersist.XpersistResult object and pass it to the result argument of the task decorator.

@task(
    target="bar.zarr",
    result=XpersistResult(
        store, serializer="xarray.zarr", serializer_dump_kwargs={"mode": "w"}
    ),
)
def get_data():
    ds = xr.DataArray(range(10), dims="x", name="bar").to_dataset()
    time.sleep(5)
    return ds


@task
def total(ds):
    return ds.sum()


with Flow("my-flow") as flow:
    ds = get_data()
    total(ds)
flow.visualize()
../_images/a1818bb54851d3d504c3a477163dee66857aeb480f7ec1322e2ea24c1ed8b5a3.svg

Run the Flow#

Now we can run the flow. Notice that the flow runs for five seconds and the result for get_data() task is cached.

%%time
flow.run()
[2022-09-06 03:25:18+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'my-flow'
[2022-09-06 03:25:18+0000] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 33.9 ms, sys: 3.67 ms, total: 37.6 ms
Wall time: 5.04 s
<Success: "All reference tasks succeeded.">

Let’s retrieve the artifact metadata to confirm that the result was properly cached:

import pprint
artifact = store.get_artifact("bar.zarr")
pprint.pprint(artifact.dict())
{'additional_metadata': {'flow_id': 'my-flow',
                         'flow_name': 'my-flow',
                         'flow_run_id': '6b7b1580-aa10-4356-a91e-4fecf731b94a',
                         'flow_run_name': '96774666-e131-4f90-bc46-a7e2d7917853',
                         'flow_run_version': '',
                         'map_index': None,
                         'task_full_name': 'get_data',
                         'task_id': 'get_data-1',
                         'task_name': 'get_data',
                         'task_run_id': '7c53bedf-8f0b-486f-b623-6e6de9c43bcf',
                         'task_run_name': '',
                         'task_run_version': '',
                         'task_slug': 'get_data-1',
                         'task_tags': [],
                         'today': '2022-09-06',
                         'tomorrow': '2022-09-07',
                         'yesterday': '2022-09-05'},
 'created_at': datetime.datetime(2022, 9, 6, 3, 25, 23, 631148),
 'dump_kwargs': {'mode': 'w'},
 'key': 'bar.zarr',
 'load_kwargs': {},
 'serializer': 'xarray.zarr'}

Running the flow again will retrieve the result from the cache instead of running the task again:

%%time
flow.run()
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'my-flow'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Cached'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 34.5 ms, sys: 3.76 ms, total: 38.2 ms
Wall time: 39.3 ms
<Success: "All reference tasks succeeded.">

Notice that the flow takes milliseconds to run instead of the original five seconds.

Note

Note that targets can optionally be templated, using values found in prefect.context. For example, the following target specification will store data based on (1) the day of the week the flow is run on, (2) the name of the flow, and (3) the name of the task:

@task(
    target="{date:%A}-{flow_name}-{task_name}.zarr",
    result=XpersistResult(
        store, serializer="xarray.zarr", serializer_dump_kwargs={"mode": "w"}
    ),
)
def foo_task():
    ds = xr.DataArray(range(10), dims="x", name="bar").to_dataset()
    time.sleep(5)
    return ds

with Flow("sample_flow") as flow:
    ds = foo_task()
%%time
flow.run()
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'foo_task': Starting task run...
[2022-09-06 03:25:28+0000] INFO - prefect.TaskRunner | Task 'foo_task': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:28+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 22.2 ms, sys: 303 µs, total: 22.5 ms
Wall time: 5.03 s
<Success: "All reference tasks succeeded.">
%%time
flow.run()
[2022-09-06 03:25:28+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2022-09-06 03:25:28+0000] INFO - prefect.TaskRunner | Task 'foo_task': Starting task run...
[2022-09-06 03:25:28+0000] INFO - prefect.TaskRunner | Task 'foo_task': Finished task run for task with final state: 'Cached'
[2022-09-06 03:25:28+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 18.5 ms, sys: 0 ns, total: 18.5 ms
Wall time: 18.2 ms
<Success: "All reference tasks succeeded.">
store.keys()
['Tuesday-sample_flow-foo_task.zarr', 'bar.zarr']