Dynamic Fanout Fanin#

from hera import Parameter, Task, ValueFrom, Workflow

def generate():
    import json
    import sys

    # this can be anything! e.g fetch from some API, then in parallel process all entities; chunk database records
    # and process them in parallel, etc.
    json.dump([{"value": i} for i in range(10)], sys.stdout)

def fanout(object: dict):
    print(f"Received object: {object}!")
    # Output the content of the "value" key in the object
    value = object["value"]
    with open("/tmp/value", "w") as f:

def fanin(values: list):
    print(f"Received values: {values}!")

# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow("dynamic-fanout-fanin") as w:
    generate_task = Task("generate", generate)
    fanout_task = Task(
        outputs=[Parameter("value", value_from=ValueFrom(path="/tmp/value"))],
    fanin_task = Task("fanin", fanin, inputs=[fanout_task.get_parameters_as("values")])

    generate_task >> fanout_task >> fanin_task
