# Flowmetal uses package source qualified imports.
# The syntax of a load is `load(<source>, *to_import, **renames)`.
# The prelude is loaded for you, but we're being explicit.
load("pkg.flowmetal.io/v1/prelude",
"args", # Flow inputs model
"flow", # The `flow` constructor
"FlowContext", # Context type for performing actions
"current_flow", # Helper for getting the fctx
"Ok", "Err", "Result", # Rust-style error handling
"fail", # Err's the current task or workflow
"unwrap", # Unwrap or `fail()` results
)
# Flows are units of error handling.
# If the task suceeds, the value of the task call is Ok(<return val>).
# If the task fails, then return value is Err(...).
# Tasks are, by default, blocking but can be used as futures.
analyze = flow(
implementation = _analyze_impl,
)
# Implementation functions do whatever you need and return results.
# Flowmetal can do internal data processing, but is designed to
# coordinate any heavy lifting as external operations and track
# metadata within a flow.
def _analyze_impl(fctx: FlowContext) -> str:
return "some results"
# Flows perform side-effects by invoking actions. Actions are things
# like creating or using resources, making requests and such. Wrap the
# `sleep` action so it's a bit nicer to use. This is in the prelude.
def sleep(duration):
current_flow().actions.sleep(duration)
# Flows are failable, but we can easily implement retrying.
# We construct a helper function with default settings which will retry.
# This is in the prelude, but there's no magic.
def retry(subflow,
should_retry = None,
limit = 3,
backoff = lambda count: 2 * count * count):
def _inner(*args,
limit=limit,
should_retry=should_retry,
backoff=backoff,
**kwargs):
for count in range(limit):
result = subflow()
if is_ok(result):
return result
# We know we have an error. Should we give up?
if should_retry:
if not should_retry(result):
return result
# Apply backoff according to the strategy
sleep(backoff(count))
# We get here only by running out of tries
return result
return _inner
# This flow will fail!
def _check_impl(fctx: FlowContext) -> None:
fail("Oh no!")
# But we can compose ourselves a flow that'll try to make it work.
# We can tell this will try three times and fail.
check_results = retry(flow(_check_impl))
# As this is an example, mock out ETL stage tasks
check = save = report = cleanup = analyze
def _etl_impl(fctx: FlowContext) -> Result:
# Call the analyze task and wait for it to be evaluated & unwrap
# the result. If the analyze task context failed, then unwrap()
# will propagate the error. Otherwise it'll give us the return
# value of the task back.
analyze_value = unwrap(analyze())
# Here we're taking the result and we'll handle the error
# ourselves. We happen to know that check will always fail.
check_results = check(analyze_value)
if is_ok(check_results):
# Run these two in parallel
save_task = fctx.actions.run(save, analyze_value)
report_task = fcxt.actions.run(report, analyze_value)
# Wait for their results
results = fctx.actions.wait([save_task, report_task])
# Cause a `fail()` if either errored
[unwrap(it) for it in results]
# Otherwise success!
return Ok(None)
else:
cleanup()
return check_results
etl_workflow = flow(
implementation = _etl_impl,
args = {
"input_data": args.string(default="input_source"),
},
)
if __name__ == "__flow___":
# Our definition of this flow is invoking our reusable flow
etl_workflow()