Flowmetalby

ETL, an introductory example

This example walks through Flowmetal’s core programming model: flows as functions, error handling with Result, retry logic, and parallel execution.

Imports

Flowmetal uses package-qualified imports. The prelude is loaded for you, but we’re being explicit here to show what comes from where.

load("pkg.flowmetal.io/prelude/v1",
     "args",                  # Pattern for declaring flow inputs
     "flow",                  # The `flow` constructor
     "FlowContext",           # The context type used for performing actions
     "fctx",          # Helper for getting the fctx
     "Ok", "Err", "Result",   # Rust-style error handling
     "is_ok",                 # Test whether a Result is Ok
     "fail",                  # Err's the current flow
     "unwrap",                # Unwraps a `Result`, `fail`ing if it's `Err`
)

Declaring a flow

A flow() declaration creates a durable, checkpointed unit of execution. The implementation function is defined separately — flow() binds a name to a recipe, and the implementation is just a regular function that receives a FlowContext.

analyze = flow(
    implementation = _analyze_impl,
)

def _analyze_impl(fctx: FlowContext) -> str:
    return "some results"

Calling analyze() invokes it as a checkpoint — the runtime handles suspension and resumption transparently. The return value is wrapped in a Result: Ok("some results") on success, Err(...) on failure.

Helpers: sleep and retry

These are prelude functions, but there’s no magic — they’re plain Starlark you could write yourself.

sleep is a trivial flow wrapping the sleep action:

def _sleep_impl(fctx: FlowContext):
    fctx.actions.sleep(fctx.args.duration)

sleep = flow(
    args = {"duration": args.int()},
    implementation = _sleep_impl,
)

retry composes a flow with backoff logic. It returns a new callable that will try up to limit times:

def retry(subflow,
          should_retry = None,
          limit = 3,
          backoff = lambda count: 2 * count * count):
    def _inner(*args,
               limit=limit,
               should_retry=should_retry,
               backoff=backoff,
               **kwargs):
        for count in range(limit):
            result = subflow(*args, **kwargs)
            if is_ok(result):
                return result

            if should_retry:
                if not should_retry(result):
                    return result

            sleep(duration = backoff(count))

        # We get here only by running out of tries
        return result

    return _inner

Using it is one line — wrap any flow to get automatic retry:

check_results = retry(flow(implementation = _check_impl))

The ETL pipeline

The main flow demonstrates error handling and parallel execution. unwrap() propagates errors — if a flow fails, unwrap() calls fail() and the pipeline stops. For finer control, use is_ok() to branch explicitly.

def _etl_impl(fctx: FlowContext) -> Result:
    analyze_value = unwrap(analyze())

    check_results = check(analyze_value)
    if is_ok(check_results):
        # Run two flows in parallel
        save_task = fctx.actions.run(save, data = analyze_value)
        report_task = fctx.actions.run(report, data = analyze_value)

        # Wait for both to complete
        results = fctx.actions.wait([save_task, report_task])

        # Propagate any errors
        [unwrap(it) for it in results]

        return Ok(None)
    else:
        cleanup()
        return check_results

Putting it together

The flow declaration ties the implementation to its arguments. The __flow__ guard is analogous to Python’s __name__ == "__main__".

etl_workflow = flow(
    implementation = _etl_impl,
    args = {
        "input_data": args.string(default="input_source"),
    },
)

if __name__ == "__flow__":
    etl_workflow()

View the complete example