ETL, an introductory example
This example walks through Flowmetal’s core programming model: flows as functions, error handling with Result, retry logic, and parallel execution.
Imports
Flowmetal uses package-qualified imports. The prelude is loaded for you, but we’re being explicit here to show what comes from where.
load("pkg.flowmetal.io/prelude/v1",
"args", # Pattern for declaring flow inputs
"flow", # The `flow` constructor
"FlowContext", # The context type used for performing actions
"fctx", # Helper for getting the fctx
"Ok", "Err", "Result", # Rust-style error handling
"is_ok", # Test whether a Result is Ok
"fail", # Err's the current flow
"unwrap", # Unwraps a `Result`, `fail`ing if it's `Err`
)
Declaring a flow
A flow() declaration creates a durable, checkpointed unit of execution. The implementation function is defined separately — flow() binds a name to a recipe, and the implementation is just a regular function that receives a FlowContext.
analyze = flow(
implementation = _analyze_impl,
)
def _analyze_impl(fctx: FlowContext) -> str:
return "some results"
Calling analyze() invokes it as a checkpoint — the runtime handles suspension and resumption transparently. The return value is wrapped in a Result: Ok("some results") on success, Err(...) on failure.
Helpers: sleep and retry
These are prelude functions, but there’s no magic — they’re plain Starlark you could write yourself.
sleep is a trivial flow wrapping the sleep action:
def _sleep_impl(fctx: FlowContext):
fctx.actions.sleep(fctx.args.duration)
sleep = flow(
args = {"duration": args.int()},
implementation = _sleep_impl,
)
retry composes a flow with backoff logic. It returns a new callable that will try up to limit times:
def retry(subflow,
should_retry = None,
limit = 3,
backoff = lambda count: 2 * count * count):
def _inner(*args,
limit=limit,
should_retry=should_retry,
backoff=backoff,
**kwargs):
for count in range(limit):
result = subflow(*args, **kwargs)
if is_ok(result):
return result
if should_retry:
if not should_retry(result):
return result
sleep(duration = backoff(count))
# We get here only by running out of tries
return result
return _inner
Using it is one line — wrap any flow to get automatic retry:
check_results = retry(flow(implementation = _check_impl))
The ETL pipeline
The main flow demonstrates error handling and parallel execution. unwrap() propagates errors — if a flow fails, unwrap() calls fail() and the pipeline stops. For finer control, use is_ok() to branch explicitly.
def _etl_impl(fctx: FlowContext) -> Result:
analyze_value = unwrap(analyze())
check_results = check(analyze_value)
if is_ok(check_results):
# Run two flows in parallel
save_task = fctx.actions.run(save, data = analyze_value)
report_task = fctx.actions.run(report, data = analyze_value)
# Wait for both to complete
results = fctx.actions.wait([save_task, report_task])
# Propagate any errors
[unwrap(it) for it in results]
return Ok(None)
else:
cleanup()
return check_results
Putting it together
The flow declaration ties the implementation to its arguments. The __flow__ guard is analogous to Python’s __name__ == "__main__".
etl_workflow = flow(
implementation = _etl_impl,
args = {
"input_data": args.string(default="input_source"),
},
)
if __name__ == "__flow__":
etl_workflow()