Dynamic Fanout Fanin#

from hera import Parameter, Task, ValueFrom, Workflow


def generate():
    import json
    import sys

    # this can be anything! e.g fetch from some API, then in parallel process all entities; chunk database records
    # and process them in parallel, etc.
    json.dump([{"value": i} for i in range(10)], sys.stdout)


def fanout(object: dict):
    print(f"Received object: {object}!")
    # Output the content of the "value" key in the object
    value = object["value"]
    with open("/tmp/value", "w") as f:
        f.write(str(value))


def fanin(values: list):
    print(f"Received values: {values}!")


# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow("dynamic-fanout-fanin") as w:
    generate_task = Task("generate", generate)
    fanout_task = Task(
        "fanout",
        fanout,
        with_param=generate_task.get_result(),
        outputs=[Parameter("value", value_from=ValueFrom(path="/tmp/value"))],
    )
    fanin_task = Task("fanin", fanin, inputs=[fanout_task.get_parameters_as("values")])

    generate_task >> fanout_task >> fanin_task

w.create()