Dynamic Fanout Fanin Artifacts#

Example derived from: https://medium.com/@corvin/dynamic-fan-out-and-fan-in-in-argo-workflows-d731e144e2fd

from hera import Archive, GCSArtifact, Task, Workflow


def generate():
    import json
    import os
    import sys

    files = []
    # Don't output directly in /tmp/ as it could contain other files
    output_folder = "/tmp/output-files"
    os.makedirs(output_folder)
    for i in range(1, 4):
        filename = f"file{i}.txt"
        files.append(filename)
        with open(os.path.join(output_folder, filename), "w") as f:
            f.write(f"hello {i}")
    # Writing a JSON-compliant array of filenames to the output
    json.dump(files, sys.stdout)


def fanout_print():
    with open("input-file", "r") as f:
        file_content = f.read()
    print(f"file content: {file_content}")


# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow("artifact-test") as wf:
    t1 = Task(
        "generate",
        generate,
        outputs=[
            GCSArtifact(
                "artifact-files",
                path="/tmp/output-files/",
                archive=Archive(disable_compression=True),
                bucket="<your bucket name>",
                key=f"fanout-{wf.get_name()}",
            )
        ],
    )

    t2 = Task(
        "fanout-print",
        fanout_print,
        with_param=t1.get_result(),
        inputs=[  # {{item}} refers to each element (file) in t1.get_result()
            t1.get_artifact("artifact-files").to_path("input-file", sub_path="{{item}}")
        ],
    )

    t3 = Task(
        "fanin",
        image="alpine:latest",
        command=["sh", "-c"],
        args=["ls /tmp/output-files/"],
        inputs=[t1.get_artifact("artifact-files")],
    )

    t1 >> t2 >> t3

wf.create()