Artifact With Fanout#

from hera import Artifact, Task, Workflow


def writer():
    import json

    with open("/file", "w+") as f:
        for i in range(10):
            f.write(f"{json.dumps(i)}\n")


def fanout():
    import json
    import sys

    indices = []
    with open("/file", "r") as f:
        for line in f.readlines():
            indices.append(line.strip())
    json.dump(indices, sys.stdout)


def consumer(i: int):
    print(i)


# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow("artifact-with-fanout") as w:
    w_t = Task("writer", writer, outputs=[Artifact("test", "/file")])
    f_t = Task(
        "fanout",
        fanout,
        inputs=[w_t.get_artifact("test")],
    )
    c_t = Task("consumer", consumer, with_param=f_t.get_result())
    w_t >> f_t >> c_t

w.create()