Parallel Dag#

This example showcases how one can schedule a workflow with a parallel DAG task through Hera

from hera import DAG, Parameter, Task, Workflow


def produce(instruction: str):
    instruction_map = {"a": "x", "b": "y", "c": "z"}
    print(instruction_map[instruction])


def wrap(product: str):
    print(f"({product})")


def gather(products: str):
    print(products)


# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow("parallel-dag") as wf:
    with DAG("pipeline", inputs=[Parameter("instruction")], outputs=[Parameter("instruction")]) as pipeline:
        t1 = Task("create", produce, inputs=[pipeline.get_parameter("instruction")])
        t2 = Task("wrap", wrap, inputs=[t1.get_result_as("product")])
        t1 >> t2
        pipeline.outputs = [t2.get_result_as("output")]

    t1 = Task("parallel-pipelines", dag=pipeline, with_param=["a", "b", "c"])
    t2 = Task("gather-results", gather, inputs=[t1.get_parameters_as("products")])
    t1 >> t2

wf.create()