apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-with-fanout-
spec:
entrypoint: d
templates:
- dag:
tasks:
- name: writer
template: writer
- arguments:
parameters:
- name: w
value: '{"name": "test", "archive": null, "archive_logs": null, "artifact_gc":
null, "deleted": null, "from_": "{{tasks.writer.outputs.artifacts.test}}",
"from_expression": null, "global_name": null, "mode": null, "path":
"/file", "recurse_mode": null, "sub_path": null}'
depends: writer
name: fanout
template: fanout
- arguments:
parameters:
- name: i
value: '{{item}}'
depends: fanout
name: consumer
template: consumer
withParam: '{{tasks.fanout.outputs.result}}'
name: d
- name: writer
outputs:
artifacts:
- name: test
path: /file
script:
command:
- python
image: python:3.8
source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport json\n\
with open('/file', 'w+') as f:\n for i in range(10):\n f.write(json.dumps(i)\
\ + '\\n')"
- inputs:
artifacts:
- name: test
path: /file
name: fanout
script:
command:
- python
image: python:3.8
source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport json\n\
import sys\nindices = []\nwith open('/file', 'r') as f:\n for line in f.readlines():\n\
\ indices.append(line.strip())\njson.dump(indices, sys.stdout)"
- inputs:
parameters:
- name: i
name: consumer
script:
command:
- python
image: python:3.8
source: 'import os
import sys
sys.path.append(os.getcwd())
import json
try: i = json.loads(r''''''{{inputs.parameters.i}}'''''')
except: i = r''''''{{inputs.parameters.i}}''''''
print(i)'