Use xpersist with Prefect

Let’s begin by importing necessary packages and enabling checkpointing.

import os

os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"

import tempfile
import time
from xpersist import XpersistResult, CacheStore
from prefect import Flow, task
import prefect
import xarray as xr

Ensure that checkpointing is enabled:


Set Cache Location

store = CacheStore(f'{tempfile.gettempdir()}/my-cache')
CacheStore(path='/tmp/my-cache', readonly=False, on_duplicate_key=<DuplicateKeyEnum.skip: 'skip'>, storage_options={})

Set Prefect Flow

To enable persisting output of a task in xpersist’s cache store, we need to define a xpersist.XpersistResult object and pass it to the result argument of the task decorator.

        store, serializer="xarray.zarr", serializer_dump_kwargs={"mode": "w"}
def get_data():
    ds = xr.DataArray(range(10), dims="x", name="bar").to_dataset()
    return ds

def total(ds):
    return ds.sum()

with Flow("my-flow") as flow:
    ds = get_data()

Run the Flow

Now we can run the flow. Notice that the flow runs for five seconds and the result for get_data() task is cached.

[2021-12-16 22:01:58+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'my-flow'
[2021-12-16 22:01:58+0000] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'total': Starting task run...
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'total': Finished task run for task with final state: 'Success'
[2021-12-16 22:02:03+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 67.8 ms, sys: 3.97 ms, total: 71.7 ms
Wall time: 5.08 s
<Success: "All reference tasks succeeded.">

Let’s retrieve the artifact metadata to confirm that the result was properly cached:

import pprint
artifact = store.get_artifact("bar.zarr")
{'additional_metadata': {'flow_id': 'my-flow',
                         'flow_name': 'my-flow',
                         'flow_run_id': 'dd53860b-272e-427b-bc57-d5c157fe7434',
                         'flow_run_name': '46471fec-ca3e-42b8-81a3-09bf1436b1bd',
                         'flow_run_version': '',
                         'map_index': None,
                         'task_full_name': 'get_data',
                         'task_id': 'get_data-1',
                         'task_name': 'get_data',
                         'task_run_id': 'f14c96ef-078a-4865-8f0c-44d6586ebb63',
                         'task_run_name': '',
                         'task_run_version': '',
                         'task_slug': 'get_data-1',
                         'task_tags': [],
                         'today': '2021-12-16',
                         'tomorrow': '2021-12-17',
                         'yesterday': '2021-12-15'},
 'created_at': datetime.datetime(2021, 12, 16, 22, 2, 3, 271315),
 'dump_kwargs': {'mode': 'w'},
 'key': 'bar.zarr',
 'load_kwargs': {},
 'serializer': 'xarray.zarr'}

Running the flow again will retrieve the result from the cache instead of running the task again:

[2021-12-16 22:02:03+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'my-flow'
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Cached'
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'total': Starting task run...
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'total': Finished task run for task with final state: 'Success'
[2021-12-16 22:02:03+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 80.3 ms, sys: 12.1 ms, total: 92.4 ms
Wall time: 91.1 ms
<Success: "All reference tasks succeeded.">

Notice that the flow takes milliseconds to run instead of the original five seconds.


Note that targets can optionally be templated, using values found in prefect.context. For example, the following target specification will store data based on (1) the day of the week the flow is run on, (2) the name of the flow, and (3) the name of the task:

        store, serializer="xarray.zarr", serializer_dump_kwargs={"mode": "w"}
def foo_task():
    ds = xr.DataArray(range(10), dims="x", name="bar").to_dataset()
    return ds

with Flow("sample_flow") as flow:
    ds = foo_task()
[2021-12-16 22:02:03+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2021-12-16 22:02:03+0000] INFO - prefect.TaskRunner | Task 'foo_task': Starting task run...
[2021-12-16 22:02:08+0000] INFO - prefect.TaskRunner | Task 'foo_task': Finished task run for task with final state: 'Success'
[2021-12-16 22:02:08+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 34 ms, sys: 7.51 ms, total: 41.5 ms
Wall time: 5.04 s
<Success: "All reference tasks succeeded.">
[2021-12-16 22:02:08+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2021-12-16 22:02:08+0000] INFO - prefect.TaskRunner | Task 'foo_task': Starting task run...
[2021-12-16 22:02:08+0000] INFO - prefect.TaskRunner | Task 'foo_task': Finished task run for task with final state: 'Cached'
[2021-12-16 22:02:08+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 37.6 ms, sys: 330 µs, total: 37.9 ms
Wall time: 37.1 ms
<Success: "All reference tasks succeeded.">
['my-dataset', 'Thursday-sample_flow-foo_task.zarr', 'bar.zarr']