Flowmetalby

Ping, Pong

# The prelude is loaded for you,
# But this shows what comes from where
load("pkg.flowmetal.io/prelude/v1",
     "args",
     "current_flow",
     "flow",
     "unwrap",
 )

# Create a flow to send messages back and forth
listen_reply = flow(
    implementation = _listen_reply_impl,
    args = {
        "listen_channel": args.channel(
            read=True,
            write=False,
        ),
        "reply_channel": args.channel(
            write=True,
            read=False,
        ),
        "reply_msg": args.string(),
    },
)

# Print isn't magic, this is in prelude.
# But we can write it out ourselves
def print(*pos,
          sep=" ",
          end="\n",
          file=None,
          timeout=None):
    if file == None:
        # Magical default channel argument
        file = current_flow().args._stdout

    msg = sep.join([str(it) for it in pos])
    if end:
        msg += end

    return current_flow().actions.write(
        file, 
        msg, 
        timeout=timeout,
        )

def _listen_reply_impl(fctx):
    for counter in forever(): # Counts up forever
        # Block on reading until there's a value
        msg = unwrap(fctx.actions.read(
            fctx.args.listen_channel,
            timeout=None,
        ))

        # Print the date, loop counter and message
        print(
            unwrap(fctx.actions.date()),
            counter,
            msg,
        )

        # Send a reply on the reply channel.
        # Blockings if the destination channel is full
        unwrap(fctx.actions.write(
            fctx.args.reply_channel,
            fctx.args.reply_msg,
            timeout=None,
        ))

if __name__ == "__flow__":
    fctx = current_flow()

    ab = unwrap(fctx.actions.chanel(
        # Unbounded is the default.
        # Writes block writers if full.
        buffer=1,
        # No rate limit is the default.
        # Writes block writers if too fast.
        ratelimit=1_000,
    ))
    ba = unwrap(fctx.actions.chanel(
        buffer=1,
        ratelimit=1_000,
    ))

    # Create an infinite message loop
    unwrap(fctx.actions.spawn(
        lambda *_, **__: listen_reply(
            listen_channel = ab,
            reply_channel = ba,
            reply_msg = "ping",
        ),
    ))
    unwrap(fctx.actions.spawn(
        lambda *_, **__: listen_reply(
            listen_channel = ba,
            reply_channel = ab,
            reply_msg = "pong",
        ),
    ))

    # Send a kickoff message into the ping-pong loop
    fctx.actions.write(ab, "<start>")