Use xpersist with Prefect#
Let’s begin by importing necessary packages and enabling checkpointing.
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"
import tempfile
import time
from xpersist import XpersistResult, CacheStore
from prefect import Flow, task
import prefect
import xarray as xr
Ensure that checkpointing is enabled:
prefect.context.to_dict()['config']['flows']['checkpointing']
True
Set Cache Location#
store = CacheStore(f'{tempfile.gettempdir()}/my-cache')
store
CacheStore(path='/tmp/my-cache', readonly=False, on_duplicate_key=<DuplicateKeyEnum.skip: 'skip'>, storage_options={})
Set Prefect Flow#
To enable persisting output of a task in xpersist’s cache store, we need to define a xpersist.XpersistResult
object and pass it to the result
argument of the task
decorator.
@task(
target="bar.zarr",
result=XpersistResult(
store, serializer="xarray.zarr", serializer_dump_kwargs={"mode": "w"}
),
)
def get_data():
ds = xr.DataArray(range(10), dims="x", name="bar").to_dataset()
time.sleep(5)
return ds
@task
def total(ds):
return ds.sum()
with Flow("my-flow") as flow:
ds = get_data()
total(ds)
flow.visualize()
Run the Flow#
Now we can run the flow. Notice that the flow runs for five seconds and the result for get_data()
task is cached.
%%time
flow.run()
[2022-09-06 03:25:18+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'my-flow'
[2022-09-06 03:25:18+0000] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 33.9 ms, sys: 3.67 ms, total: 37.6 ms
Wall time: 5.04 s
<Success: "All reference tasks succeeded.">
Let’s retrieve the artifact metadata to confirm that the result was properly cached:
import pprint
artifact = store.get_artifact("bar.zarr")
pprint.pprint(artifact.dict())
{'additional_metadata': {'flow_id': 'my-flow',
'flow_name': 'my-flow',
'flow_run_id': '6b7b1580-aa10-4356-a91e-4fecf731b94a',
'flow_run_name': '96774666-e131-4f90-bc46-a7e2d7917853',
'flow_run_version': '',
'map_index': None,
'task_full_name': 'get_data',
'task_id': 'get_data-1',
'task_name': 'get_data',
'task_run_id': '7c53bedf-8f0b-486f-b623-6e6de9c43bcf',
'task_run_name': '',
'task_run_version': '',
'task_slug': 'get_data-1',
'task_tags': [],
'today': '2022-09-06',
'tomorrow': '2022-09-07',
'yesterday': '2022-09-05'},
'created_at': datetime.datetime(2022, 9, 6, 3, 25, 23, 631148),
'dump_kwargs': {'mode': 'w'},
'key': 'bar.zarr',
'load_kwargs': {},
'serializer': 'xarray.zarr'}
Running the flow again will retrieve the result from the cache instead of running the task again:
%%time
flow.run()
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'my-flow'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Cached'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Starting task run...
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'total': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 34.5 ms, sys: 3.76 ms, total: 38.2 ms
Wall time: 39.3 ms
<Success: "All reference tasks succeeded.">
Notice that the flow takes milliseconds to run instead of the original five seconds.
Note
Note that targets can optionally be templated, using values found in prefect.context. For example, the following target specification will store data based on (1) the day of the week the flow is run on, (2) the name of the flow, and (3) the name of the task:
@task(
target="{date:%A}-{flow_name}-{task_name}.zarr",
result=XpersistResult(
store, serializer="xarray.zarr", serializer_dump_kwargs={"mode": "w"}
),
)
def foo_task():
ds = xr.DataArray(range(10), dims="x", name="bar").to_dataset()
time.sleep(5)
return ds
with Flow("sample_flow") as flow:
ds = foo_task()
%%time
flow.run()
[2022-09-06 03:25:23+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2022-09-06 03:25:23+0000] INFO - prefect.TaskRunner | Task 'foo_task': Starting task run...
[2022-09-06 03:25:28+0000] INFO - prefect.TaskRunner | Task 'foo_task': Finished task run for task with final state: 'Success'
[2022-09-06 03:25:28+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 22.2 ms, sys: 303 µs, total: 22.5 ms
Wall time: 5.03 s
<Success: "All reference tasks succeeded.">
%%time
flow.run()
[2022-09-06 03:25:28+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2022-09-06 03:25:28+0000] INFO - prefect.TaskRunner | Task 'foo_task': Starting task run...
[2022-09-06 03:25:28+0000] INFO - prefect.TaskRunner | Task 'foo_task': Finished task run for task with final state: 'Cached'
[2022-09-06 03:25:28+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
CPU times: user 18.5 ms, sys: 0 ns, total: 18.5 ms
Wall time: 18.2 ms
<Success: "All reference tasks succeeded.">
store.keys()
['Tuesday-sample_flow-foo_task.zarr', 'bar.zarr']