Skip to content

Artifact With Fanout

from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Workflow, script


@script(outputs=Artifact(name="out-art", path="/tmp/file", archive=NoneArchiveStrategy()))
def writer():
    import json

    with open("/tmp/file", "w+") as f:
        for i in range(10):
            f.write(json.dumps(i) + "\n")


@script(inputs=Artifact(name="in-art", path="/tmp/file"))
def fanout():
    import json
    import sys

    indices = []
    with open("/tmp/file", "r") as f:
        for line in f.readlines():
            indices.append(line.strip())
    json.dump(indices, sys.stdout)


@script()
def consumer(i: int):
    print(i)


with Workflow(generate_name="artifact-with-fanout-", entrypoint="d") as w:
    with DAG(name="d"):
        w_ = writer()
        f = fanout(arguments=w_.get_artifact("out-art").with_name("in-art"))
        c = consumer(with_param=f.result)
        w_ >> f >> c
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-with-fanout-
spec:
  entrypoint: d
  templates:
  - dag:
      tasks:
      - name: writer
        template: writer
      - arguments:
          artifacts:
          - from: '{{tasks.writer.outputs.artifacts.out-art}}'
            name: in-art
        depends: writer
        name: fanout
        template: fanout
      - arguments:
          parameters:
          - name: i
            value: '{{item}}'
        depends: fanout
        name: consumer
        template: consumer
        withParam: '{{tasks.fanout.outputs.result}}'
    name: d
  - name: writer
    outputs:
      artifacts:
      - archive:
          none: {}
        name: out-art
        path: /tmp/file
    script:
      command:
      - python
      image: python:3.8
      source: |-
        import os
        import sys
        sys.path.append(os.getcwd())
        import json
        with open('/tmp/file', 'w+') as f:
            for i in range(10):
                f.write(json.dumps(i) + '\n')
  - inputs:
      artifacts:
      - name: in-art
        path: /tmp/file
    name: fanout
    script:
      command:
      - python
      image: python:3.8
      source: |-
        import os
        import sys
        sys.path.append(os.getcwd())
        import json
        import sys
        indices = []
        with open('/tmp/file', 'r') as f:
            for line in f.readlines():
                indices.append(line.strip())
        json.dump(indices, sys.stdout)
  - inputs:
      parameters:
      - name: i
    name: consumer
    script:
      command:
      - python
      image: python:3.8
      source: |-
        import os
        import sys
        sys.path.append(os.getcwd())
        import json
        try: i = json.loads(r'''{{inputs.parameters.i}}''')
        except: i = r'''{{inputs.parameters.i}}'''

        print(i)

Comments