# The prelude is loaded for you,
# But this shows what comes from where
load("pkg.flowmetal.io/prelude/v1",
"args",
"current_flow",
"flow",
"unwrap",
)
# Create a flow to send messages back and forth
listen_reply = flow(
implementation = _listen_reply_impl,
args = {
"listen_channel": args.channel(
read=True,
write=False,
),
"reply_channel": args.channel(
write=True,
read=False,
),
"reply_msg": args.string(),
},
)
# Print isn't magic, this is in prelude.
# But we can write it out ourselves
def print(*pos,
sep=" ",
end="\n",
file=None,
timeout=None):
if file == None:
# Magical default channel argument
file = current_flow().args._stdout
msg = sep.join([str(it) for it in pos])
if end:
msg += end
return current_flow().actions.write(
file,
msg,
timeout=timeout,
)
def _listen_reply_impl(fctx):
for counter in forever(): # Counts up forever
# Block on reading until there's a value
msg = unwrap(fctx.actions.read(
fctx.args.listen_channel,
timeout=None,
))
# Print the date, loop counter and message
print(
unwrap(fctx.actions.date()),
counter,
msg,
)
# Send a reply on the reply channel.
# Blockings if the destination channel is full
unwrap(fctx.actions.write(
fctx.args.reply_channel,
fctx.args.reply_msg,
timeout=None,
))
if __name__ == "__flow__":
fctx = current_flow()
ab = unwrap(fctx.actions.chanel(
# Unbounded is the default.
# Writes block writers if full.
buffer=1,
# No rate limit is the default.
# Writes block writers if too fast.
ratelimit=1_000,
))
ba = unwrap(fctx.actions.chanel(
buffer=1,
ratelimit=1_000,
))
# Create an infinite message loop
unwrap(fctx.actions.spawn(
lambda *_, **__: listen_reply(
listen_channel = ab,
reply_channel = ba,
reply_msg = "ping",
),
))
unwrap(fctx.actions.spawn(
lambda *_, **__: listen_reply(
listen_channel = ba,
reply_channel = ab,
reply_msg = "pong",
),
))
# Send a kickoff message into the ping-pong loop
fctx.actions.write(ab, "<start>")