Flowmetalby

ETL, an introductory example

# Flowmetal uses package source qualified imports.
# The syntax of a load is `load(<source>, *to_import, **renames)`.
# The prelude is loaded for you, but we're being explicit.
load("pkg.flowmetal.io/v1/prelude",
     "args",                  # Flow inputs model
     "flow",                  # The `flow` constructor
     "FlowContext",           # Context type for performing actions
     "current_flow",          # Helper for getting the fctx
     "Ok", "Err", "Result",   # Rust-style error handling
     "fail",                  # Err's the current task or workflow
     "unwrap",                # Unwrap or `fail()` results
)

# Flows are units of error handling.
# If the task suceeds, the value of the task call is Ok(<return val>).
# If the task fails, then return value is Err(...).
# Tasks are, by default, blocking but can be used as futures.
analyze = flow(
    implementation = _analyze_impl,
)

# Implementation functions do whatever you need and return results.
# Flowmetal can do internal data processing, but is designed to
# coordinate any heavy lifting as external operations and track
# metadata within a flow.
def _analyze_impl(fctx: FlowContext) -> str:
    return "some results"

# Flows perform side-effects by invoking actions. Actions are things
# like creating or using resources, making requests and such. Wrap the
# `sleep` action so it's a bit nicer to use. This is in the prelude.
def sleep(duration):
    current_flow().actions.sleep(duration)

# Flows are failable, but we can easily implement retrying.
# We construct a helper function with default settings which will retry.
# This is in the prelude, but there's no magic.
def retry(subflow,
          should_retry = None,
          limit = 3,
          backoff = lambda count: 2 * count * count):
    def _inner(*args,
               limit=limit,
               should_retry=should_retry,
               backoff=backoff,
               **kwargs):
        for count in range(limit):
            result = subflow()
            if is_ok(result):
                return result

            # We know we have an error. Should we give up?
            if should_retry:
                if not should_retry(result):
                    return result

            # Apply backoff according to the strategy
            sleep(backoff(count))

        # We get here only by running out of tries
        return result

   return _inner

# This flow will fail!
def _check_impl(fctx: FlowContext) -> None:
    fail("Oh no!")

# But we can compose ourselves a flow that'll try to make it work.
# We can tell this will try three times and fail.
check_results = retry(flow(_check_impl))

# As this is an example, mock out ETL stage tasks
check = save = report = cleanup = analyze

def _etl_impl(fctx: FlowContext) -> Result:
    # Call the analyze task and wait for it to be evaluated & unwrap
    # the result. If the analyze task context failed, then unwrap()
    # will propagate the error. Otherwise it'll give us the return
    # value of the task back.
    analyze_value = unwrap(analyze())

    # Here we're taking the result and we'll handle the error
    # ourselves. We happen to know that check will always fail.
    check_results = check(analyze_value)
    if is_ok(check_results):
        # Run these two in parallel
        save_task = fctx.actions.run(save, analyze_value)
        report_task = fcxt.actions.run(report, analyze_value)

        # Wait for their results
        results = fctx.actions.wait([save_task, report_task])

        # Cause a `fail()` if either errored
        [unwrap(it) for it in results]

        # Otherwise success!
        return Ok(None)

    else:
        cleanup()
        return check_results

etl_workflow = flow(
    implementation = _etl_impl,
    args = {
        "input_data": args.string(default="input_source"),
    },
)

if __name__ == "__flow___":
    # Our definition of this flow is invoking our reusable flow
    etl_workflow()