Skip to content

Hera Workflows

hera.workflows

Hera classes

AWSElasticBlockStoreVolumeVolume

Source code in hera/workflows/volume.py
class AWSElasticBlockStoreVolumeVolume(_BaseVolume, _ModelAWSElasticBlockStoreVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            aws_elastic_block_store=_ModelAWSElasticBlockStoreVolumeSource(
                fs_type=self.fs_type, partition=self.partition, read_only=self.read_only, volume_id=self.volume_id
            ),
        )

AccessMode

A representations of the volume access modes for Kubernetes.

Notes

See: https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes for more information.

Source code in hera/workflows/volume.py
class AccessMode(Enum):
    """A representations of the volume access modes for Kubernetes.

    Notes
    -----
    See: https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes for more information.
    """

    read_write_once = "ReadWriteOnce"
    """
    The volume can be mounted as read-write by a single node. ReadWriteOnce access mode still can allow multiple
    pods to access the volume when the pods are running on the same node
    """

    read_only_many = "ReadOnlyMany"
    """The volume can be mounted as read-only by many nodes"""

    read_write_many = "ReadWriteMany"
    """The volume can be mounted as read-write by many nodes"""

    read_write_once_pod = "ReadWriteOncePod"
    """
    The volume can be mounted as read-write by a single Pod. Use ReadWriteOncePod access mode if you want to
    ensure that only one pod across whole cluster can read that PVC or write to it. This is only supported for CSI
    volumes and Kubernetes version 1.22+.
    """

    def __str__(self):
        return str(self.value)

read_only_many class-attribute

read_only_many = 'ReadOnlyMany'

The volume can be mounted as read-only by many nodes

read_write_many class-attribute

read_write_many = 'ReadWriteMany'

The volume can be mounted as read-write by many nodes

read_write_once class-attribute

read_write_once = 'ReadWriteOnce'

The volume can be mounted as read-write by a single node. ReadWriteOnce access mode still can allow multiple pods to access the volume when the pods are running on the same node

read_write_once_pod class-attribute

read_write_once_pod = 'ReadWriteOncePod'

The volume can be mounted as read-write by a single Pod. Use ReadWriteOncePod access mode if you want to ensure that only one pod across whole cluster can read that PVC or write to it. This is only supported for CSI volumes and Kubernetes version 1.22+.

ArchiveStrategy

Source code in hera/workflows/archive.py
class ArchiveStrategy(BaseModel):
    def _build_archive_strategy(self) -> _ModelArchiveStrategy:
        return _ModelArchiveStrategy()

Artifact

Source code in hera/workflows/artifact.py
class Artifact(BaseModel):
    name: str
    archive: Optional[Union[_ModelArchiveStrategy, ArchiveStrategy]] = None
    archive_logs: Optional[bool] = None
    artifact_gc: Optional[ArtifactGC] = None
    deleted: Optional[bool] = None
    from_: Optional[str] = None
    from_expression: Optional[str] = None
    global_name: Optional[str] = None
    mode: Optional[int] = None
    path: Optional[str] = None
    recurse_mode: Optional[str] = None
    sub_path: Optional[str] = None

    def _build_archive(self) -> Optional[_ModelArchiveStrategy]:
        if self.archive is None:
            return None

        if isinstance(self.archive, _ModelArchiveStrategy):
            return self.archive
        return cast(ArchiveStrategy, self.archive)._build_archive_strategy()

    def _build_artifact(self) -> _ModelArtifact:
        return _ModelArtifact(
            name=self.name,
            archive=self._build_archive(),
            archive_logs=self.archive_logs,
            artifact_gc=self.artifact_gc,
            deleted=self.deleted,
            from_=self.from_,
            from_expression=self.from_expression,
            global_name=self.global_name,
            mode=self.mode,
            path=self.path,
            recurse_mode=self.recurse_mode,
            sub_path=self.sub_path,
        )

    def _build_artifact_paths(self) -> _ModelArtifactPaths:
        artifact = self._build_artifact()
        return _ModelArtifactPaths(**artifact.dict())

    def as_name(self, name: str) -> _ModelArtifact:
        artifact = self._build_artifact()
        artifact.name = name
        return artifact

archive class-attribute

archive: Optional[Union[_ModelArchiveStrategy, ArchiveStrategy]] = None

archive_logs class-attribute

archive_logs: Optional[bool] = None

artifact_gc class-attribute

artifact_gc: Optional[ArtifactGC] = None

deleted class-attribute

deleted: Optional[bool] = None

from_ class-attribute

from_: Optional[str] = None

from_expression class-attribute

from_expression: Optional[str] = None

global_name class-attribute

global_name: Optional[str] = None

mode class-attribute

mode: Optional[int] = None

name class-attribute

name: str

path class-attribute

path: Optional[str] = None

recurse_mode class-attribute

recurse_mode: Optional[str] = None

sub_path class-attribute

sub_path: Optional[str] = None

as_name

as_name(name)
Source code in hera/workflows/artifact.py
def as_name(self, name: str) -> _ModelArtifact:
    artifact = self._build_artifact()
    artifact.name = name
    return artifact

ArtifactoryArtifact

Source code in hera/workflows/artifact.py
class ArtifactoryArtifact(_ModelArtifactoryArtifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.artifactory = _ModelArtifactoryArtifact(
            url=self.url, password_secret=self.password_secret, username_secret=self.username_secret
        )
        return artifact

AzureArtifact

Source code in hera/workflows/artifact.py
class AzureArtifact(_ModelAzureArtifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.azure = _ModelAzureArtifact(
            account_key_secret=self.account_key_secret,
            blob=self.blob,
            container=self.container,
            endpoint=self.endpoint,
            use_sdk_creds=self.use_sdk_creds,
        )
        return artifact

AzureDiskVolumeVolume

Source code in hera/workflows/volume.py
class AzureDiskVolumeVolume(_BaseVolume, _ModelAzureDiskVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            azure_disk=_ModelAzureDiskVolumeSource(
                caching_mode=self.caching_mode,
                disk_name=self.disk_name,
                disk_uri=self.disk_uri,
                fs_type=self.fs_type,
                kind=self.kind,
                read_only=self.read_only,
            ),
        )

AzureFileVolumeVolume

Source code in hera/workflows/volume.py
class AzureFileVolumeVolume(_BaseVolume, _ModelAzureFileVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            azure_file=_ModelAzureFileVolumeSource(
                read_only=self.read_only, secret_name=self.secret_name, share_name=self.share_name
            ),
        )

CSIVolume

Source code in hera/workflows/volume.py
class CSIVolume(_BaseVolume, _ModelCSIVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            csi=_ModelCSIVolumeSource(
                driver=self.driver,
                fs_type=self.fs_type,
                node_publish_secret_ref=self.node_publish_secret_ref,
                read_only=self.read_only,
                volume_attributes=self.volume_attributes,
            ),
        )

CephFSVolumeVolume

Source code in hera/workflows/volume.py
class CephFSVolumeVolume(_BaseVolume, _ModelCephFSVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            cephfs=_ModelCephFSVolumeSource(
                monitors=self.monitors,
                path=self.path,
                read_only=self.read_only,
                secret_file=self.secret_file,
                secret_ref=self.secret_ref,
                user=self.user,
            ),
        )

CinderVolume

Source code in hera/workflows/volume.py
class CinderVolume(_BaseVolume, _ModelCinderVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            cinder=_ModelCinderVolumeSource(
                fs_type=self.fs_type,
                read_only=self.read_only,
                secret_ref=self.secret_ref,
                volume_id=self.volume_id,
            ),
        )

ConfigMapEnv

Source code in hera/workflows/env.py
class ConfigMapEnv(_BaseEnv):
    config_map_key: str
    config_map_name: Optional[str]
    optional: Optional[bool] = None

    def build(self) -> _ModelEnvVar:
        """Constructs and returns the Argo environment specification"""
        return _ModelEnvVar(
            name=self.name,
            value_from=_ModelEnvVarSource(
                config_map_key_ref=_ModelConfigMapKeySelector(
                    name=self.config_map_name, key=self.config_map_key, optional=self.optional
                )
            ),
        )

config_map_key class-attribute

config_map_key: str

config_map_name class-attribute

config_map_name: Optional[str]

optional class-attribute

optional: Optional[bool] = None

build

build()

Constructs and returns the Argo environment specification

Source code in hera/workflows/env.py
def build(self) -> _ModelEnvVar:
    """Constructs and returns the Argo environment specification"""
    return _ModelEnvVar(
        name=self.name,
        value_from=_ModelEnvVarSource(
            config_map_key_ref=_ModelConfigMapKeySelector(
                name=self.config_map_name, key=self.config_map_key, optional=self.optional
            )
        ),
    )

ConfigMapEnvFrom

Source code in hera/workflows/env_from.py
class ConfigMapEnvFrom(_BaseEnvFrom, _ModelConfigMapEnvSource):
    def build(self) -> _ModelEnvFromSource:
        """Constructs and returns the Argo EnvFrom specification"""
        return _ModelEnvFromSource(
            prefix=self.prefix,
            config_map_ref=_ModelConfigMapEnvSource(
                name=self.name,
                optional=self.optional,
            ),
        )

build

build()

Constructs and returns the Argo EnvFrom specification

Source code in hera/workflows/env_from.py
def build(self) -> _ModelEnvFromSource:
    """Constructs and returns the Argo EnvFrom specification"""
    return _ModelEnvFromSource(
        prefix=self.prefix,
        config_map_ref=_ModelConfigMapEnvSource(
            name=self.name,
            optional=self.optional,
        ),
    )

ConfigMapVolume

Source code in hera/workflows/volume.py
class ConfigMapVolume(_BaseVolume, _ModelConfigMapVolumeSource):  # type: ignore
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            config_map=_ModelConfigMapVolumeSource(
                default_mode=self.default_mode, items=self.items, name=self.name, optional=self.optional
            ),
        )

Container

The Container template type defines a container to run on Argo.

Source code in hera/workflows/container.py
class Container(
    EnvIOMixin,
    ContainerMixin,
    TemplateMixin,
    ResourceMixin,
    VolumeMountMixin,
    CallableTemplateMixin,
):
    """The Container template type defines a container to run on Argo."""

    args: Optional[List[str]] = None
    command: Optional[List[str]] = None
    lifecycle: Optional[Lifecycle] = None
    security_context: Optional[SecurityContext] = None
    working_dir: Optional[str] = None

    def _build_container(self) -> _ModelContainer:
        return _ModelContainer(
            args=self.args,
            command=self.command,
            env=self._build_env(),
            env_from=self._build_env_from(),
            image=self.image,
            image_pull_policy=self._build_image_pull_policy(),
            lifecycle=self.lifecycle,
            liveness_probe=self.liveness_probe,
            ports=self.ports,
            readiness_probe=self.readiness_probe,
            resources=self._build_resources(),
            security_context=self.security_context,
            startup_probe=self.startup_probe,
            stdin=self.stdin,
            stdin_once=self.stdin_once,
            termination_message_path=self.termination_message_path,
            termination_message_policy=self.termination_message_policy,
            tty=self.tty,
            volume_devices=self.volume_devices,
            volume_mounts=self._build_volume_mounts(),
            working_dir=self.working_dir,
        )

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            container=self._build_container(),
            daemon=self.daemon,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self.metrics,
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            resource=self._build_resources(),
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
            volumes=self._build_volumes(),
        )

args class-attribute

args: Optional[List[str]] = None

command class-attribute

command: Optional[List[str]] = None

lifecycle class-attribute

lifecycle: Optional[Lifecycle] = None

security_context class-attribute

security_context: Optional[SecurityContext] = None

working_dir class-attribute

working_dir: Optional[str] = None

ContainerNode

Source code in hera/workflows/container_set.py
class ContainerNode(_ModelContainerNode, SubNodeMixin):
    def next(self, other: ContainerNode) -> ContainerNode:
        assert issubclass(other.__class__, ContainerNode)
        if other.dependencies is None:
            other.dependencies = [self.name]
        else:
            other.dependencies.append(self.name)
        other.dependencies = sorted(list(set(other.dependencies)))
        return other

    def __rrshift__(self, other: List[ContainerNode]) -> ContainerNode:
        assert isinstance(other, list), f"Unknown type {type(other)} specified using reverse right bitshift operator"
        for o in other:
            o.next(self)
        return self

    def __rshift__(
        self, other: Union[ContainerNode, List[ContainerNode]]
    ) -> Union[ContainerNode, List[ContainerNode]]:
        if isinstance(other, ContainerNode):
            return self.next(other)
        elif isinstance(other, list):
            for o in other:
                assert isinstance(
                    o, ContainerNode
                ), f"Unknown list item type {type(o)} specified using right bitshift operator `>>`"
                self.next(o)
            return other
        raise ValueError(f"Unknown type {type(other)} provided to `__rshift__`")

next

next(other)
Source code in hera/workflows/container_set.py
def next(self, other: ContainerNode) -> ContainerNode:
    assert issubclass(other.__class__, ContainerNode)
    if other.dependencies is None:
        other.dependencies = [self.name]
    else:
        other.dependencies.append(self.name)
    other.dependencies = sorted(list(set(other.dependencies)))
    return other

ContainerSet

Source code in hera/workflows/container_set.py
class ContainerSet(
    EnvIOMixin,
    ContainerMixin,
    TemplateMixin,
    ResourceMixin,
    VolumeMountMixin,
    ContextMixin,
):
    containers: List[ContainerNode] = []
    container_set_retry_strategy: Optional[ContainerSetRetryStrategy] = None

    def _add_sub(self, node: Any):
        if not isinstance(node, ContainerNode):
            raise InvalidType(type(node))

        self.containers.append(node)

    def _build_container_set(self) -> _ModelContainerSetTemplate:
        return _ModelContainerSetTemplate(
            containers=self.containers,
            retry_strategy=self.container_set_retry_strategy,
            volume_mounts=self.volume_mounts,
        )

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            container_set=self._build_container_set(),
            daemon=self.daemon,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self.metrics,
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            resource=self._build_resources(),
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
            volumes=self._build_volumes(),
        )

container_set_retry_strategy class-attribute

container_set_retry_strategy: Optional[ContainerSetRetryStrategy] = None

containers class-attribute

containers: List[ContainerNode] = []

CronWorkflow

CronWorkflow allows a user to run a Workflow on a recurring basis.

NB: Hera’s CronWorkflow is a subclass of Workflow which means certain fields are renamed for compatibility, see cron_suspend and cron_status which are different from the Argo spec. See https://argoproj.github.io/argo-workflows/fields/#cronworkflow

Source code in hera/workflows/cron_workflow.py
class CronWorkflow(Workflow):
    """CronWorkflow allows a user to run a Workflow on a recurring basis.

    NB: Hera's CronWorkflow is a subclass of Workflow which means certain fields are renamed
    for compatibility, see `cron_suspend` and `cron_status` which are different from the Argo
    spec. See https://argoproj.github.io/argo-workflows/fields/#cronworkflow
    """

    concurrency_policy: Optional[str] = None
    failed_jobs_history_limit: Optional[int] = None
    schedule: str
    starting_deadline_seconds: Optional[int] = None
    successful_jobs_history_limit: Optional[int] = None
    cron_suspend: Optional[bool] = None
    timezone: Optional[str] = None
    cron_status: Optional[CronWorkflowStatus] = None

    def build(self) -> TWorkflow:
        """Builds the CronWorkflow and its components into an Argo schema CronWorkflow object."""
        self = self._dispatch_hooks()

        return _ModelCronWorkflow(
            api_version=self.api_version,
            kind=self.kind,
            metadata=ObjectMeta(
                annotations=self.annotations,
                cluster_name=self.cluster_name,
                creation_timestamp=self.creation_timestamp,
                deletion_grace_period_seconds=self.deletion_grace_period_seconds,
                deletion_timestamp=self.deletion_timestamp,
                finalizers=self.finalizers,
                generate_name=self.generate_name,
                generation=self.generation,
                labels=self.labels,
                managed_fields=self.managed_fields,
                name=self.name,
                namespace=self.namespace,
                owner_references=self.owner_references,
                resource_version=self.resource_version,
                self_link=self.self_link,
                uid=self.uid,
            ),
            spec=CronWorkflowSpec(
                concurrency_policy=self.concurrency_policy,
                failed_jobs_history_limit=self.failed_jobs_history_limit,
                schedule=self.schedule,
                starting_deadline_seconds=self.starting_deadline_seconds,
                successful_jobs_history_limit=self.successful_jobs_history_limit,
                suspend=self.cron_suspend,
                timezone=self.timezone,
                workflow_metadata=None,
                workflow_spec=super().build().spec,
            ),
            status=self.cron_status,
        )

    def create(self) -> TWorkflow:
        """Creates the CronWorkflow on the Argo cluster."""
        assert self.workflows_service, "workflow service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.create_cron_workflow(
            self.namespace, CreateCronWorkflowRequest(workflow=self.build())
        )

    def lint(self) -> TWorkflow:
        """Lints the CronWorkflow using the Argo cluster."""
        assert self.workflows_service, "workflow service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.lint_cron_workflow(
            self.namespace, LintCronWorkflowRequest(workflow=self.build())
        )

concurrency_policy class-attribute

concurrency_policy: Optional[str] = None

cron_status class-attribute

cron_status: Optional[CronWorkflowStatus] = None

cron_suspend class-attribute

cron_suspend: Optional[bool] = None

failed_jobs_history_limit class-attribute

failed_jobs_history_limit: Optional[int] = None

schedule class-attribute

schedule: str

starting_deadline_seconds class-attribute

starting_deadline_seconds: Optional[int] = None

successful_jobs_history_limit class-attribute

successful_jobs_history_limit: Optional[int] = None

timezone class-attribute

timezone: Optional[str] = None

build

build()

Builds the CronWorkflow and its components into an Argo schema CronWorkflow object.

Source code in hera/workflows/cron_workflow.py
def build(self) -> TWorkflow:
    """Builds the CronWorkflow and its components into an Argo schema CronWorkflow object."""
    self = self._dispatch_hooks()

    return _ModelCronWorkflow(
        api_version=self.api_version,
        kind=self.kind,
        metadata=ObjectMeta(
            annotations=self.annotations,
            cluster_name=self.cluster_name,
            creation_timestamp=self.creation_timestamp,
            deletion_grace_period_seconds=self.deletion_grace_period_seconds,
            deletion_timestamp=self.deletion_timestamp,
            finalizers=self.finalizers,
            generate_name=self.generate_name,
            generation=self.generation,
            labels=self.labels,
            managed_fields=self.managed_fields,
            name=self.name,
            namespace=self.namespace,
            owner_references=self.owner_references,
            resource_version=self.resource_version,
            self_link=self.self_link,
            uid=self.uid,
        ),
        spec=CronWorkflowSpec(
            concurrency_policy=self.concurrency_policy,
            failed_jobs_history_limit=self.failed_jobs_history_limit,
            schedule=self.schedule,
            starting_deadline_seconds=self.starting_deadline_seconds,
            successful_jobs_history_limit=self.successful_jobs_history_limit,
            suspend=self.cron_suspend,
            timezone=self.timezone,
            workflow_metadata=None,
            workflow_spec=super().build().spec,
        ),
        status=self.cron_status,
    )

create

create()

Creates the CronWorkflow on the Argo cluster.

Source code in hera/workflows/cron_workflow.py
def create(self) -> TWorkflow:
    """Creates the CronWorkflow on the Argo cluster."""
    assert self.workflows_service, "workflow service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.create_cron_workflow(
        self.namespace, CreateCronWorkflowRequest(workflow=self.build())
    )

lint

lint()

Lints the CronWorkflow using the Argo cluster.

Source code in hera/workflows/cron_workflow.py
def lint(self) -> TWorkflow:
    """Lints the CronWorkflow using the Argo cluster."""
    assert self.workflows_service, "workflow service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.lint_cron_workflow(
        self.namespace, LintCronWorkflowRequest(workflow=self.build())
    )

DAG

A DAG template invocator is used to define Task dependencies as an acyclic graph.

DAG implements the contextmanager interface so allows usage of with, under which any hera.workflows.task.Task objects instantiated will be added to the DAG’s list of Tasks.

Source code in hera/workflows/dag.py
class DAG(IOMixin, TemplateMixin, ContextMixin):
    """A DAG template invocator is used to define Task dependencies as an acyclic graph.

    DAG implements the contextmanager interface so allows usage of `with`, under which any
    `hera.workflows.task.Task` objects instantiated will be added to the DAG's list of Tasks.
    """

    fail_fast: Optional[bool] = None
    target: Optional[str] = None
    tasks: List[Union[Task, DAGTask]] = []

    def _add_sub(self, node: Any):
        if not isinstance(node, Task):
            raise InvalidType(type(node))
        self.tasks.append(node)

    def _build_template(self) -> _ModelTemplate:
        tasks = []
        for task in self.tasks:
            if isinstance(task, Task):
                tasks.append(task._build_dag_task())
            else:
                tasks.append(task)
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            daemon=self.daemon,
            dag=_ModelDAGTemplate(fail_fast=self.fail_fast, target=self.target, tasks=tasks),
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self.metrics,
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

fail_fast class-attribute

fail_fast: Optional[bool] = None

target class-attribute

target: Optional[str] = None

tasks class-attribute

tasks: List[Union[Task, DAGTask]] = []

Data

Source code in hera/workflows/data.py
class Data(TemplateMixin, IOMixin):
    source: Union[m.DataSource, m.ArtifactPaths, Artifact]
    transformations: List[Union[str, Node]] = []

    def _build_source(self) -> m.DataSource:
        if isinstance(self.source, m.DataSource):
            return self.source
        elif isinstance(self.source, m.ArtifactPaths):
            return m.DataSource(artifact_paths=self.source)
        return m.DataSource(artifact_paths=self.source._build_artifact_paths())

    def _build_data(self) -> m.Data:
        return m.Data(
            source=self._build_source(),
            transformation=list(map(lambda expr: m.TransformationStep(expression=str(expr)), self.transformations)),
        )

    def _build_template(self) -> m.Template:
        return m.Template(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            data=self._build_data(),
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            outputs=self._build_outputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            name=self.name,
            node_selector=self.node_selector,
            plugin=self.plugin,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

source class-attribute

source: Union[m.DataSource, m.ArtifactPaths, Artifact]

transformations class-attribute

transformations: List[Union[str, Node]] = []

DownwardAPIVolume

Source code in hera/workflows/volume.py
class DownwardAPIVolume(_BaseVolume, _ModelDownwardAPIVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            downward_api=_ModelDownwardAPIVolumeSource(default_mode=self.default_mode, items=self.items),
        )

EmptyDirVolume

Source code in hera/workflows/volume.py
class EmptyDirVolume(_BaseVolume, _ModelEmptyDirVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name, empty_dir=_ModelEmptyDirVolumeSource(medium=self.medium, size_limit=self.size_limit)
        )

Env

Source code in hera/workflows/env.py
class Env(_BaseEnv):
    value: Optional[Any] = None
    value_from_input: Optional[Union[str, Parameter]] = None

    @staticmethod
    def _sanitise_param_for_argo(v: str) -> str:
        """Argo has some strict parameter validation. To satisfy, we replace all ._ with a dash,
        take only first 32 characters from a-zA-Z0-9-, and append md5 digest of the original string."""
        # NOTE move this to some general purpose utils?
        replaced_dashes = v.translate(str.maketrans({e: "-" for e in "_."}))  # type: ignore
        legit_set = string.ascii_letters + string.digits + "-"
        legit_prefix = "".join(islice((c for c in replaced_dashes if c in legit_set), 32))
        hash_suffix = hashlib.md5(v.encode("utf-8")).hexdigest()
        return f"{legit_prefix}-{hash_suffix}"

    @root_validator(pre=True)
    @classmethod
    def _check_values(cls, values):
        if values.get("value") is not None and values.get("value_from_input") is not None:
            raise ValueError("cannot specify both `value` and `value_from_input`")

        return values

    @property
    def param_name(self) -> str:
        if not self.value_from_input:
            raise ValueError(
                "unexpected use of `param_name` -- without value_from_input, no param should be generated"
            )
        return Env._sanitise_param_for_argo(self.name)

    def build(self) -> _ModelEnvVar:
        """Constructs and returns the Argo environment specification"""
        if self.value_from_input is not None:
            self.value = f"{{{{inputs.parameters.{self.param_name}}}}}"
        elif isinstance(self.value, str):
            self.value = self.value
        else:
            self.value = json.dumps(self.value)
        return _ModelEnvVar(name=self.name, value=self.value)

param_name property

param_name: str

value class-attribute

value: Optional[Any] = None

value_from_input class-attribute

value_from_input: Optional[Union[str, Parameter]] = None

build

build()

Constructs and returns the Argo environment specification

Source code in hera/workflows/env.py
def build(self) -> _ModelEnvVar:
    """Constructs and returns the Argo environment specification"""
    if self.value_from_input is not None:
        self.value = f"{{{{inputs.parameters.{self.param_name}}}}}"
    elif isinstance(self.value, str):
        self.value = self.value
    else:
        self.value = json.dumps(self.value)
    return _ModelEnvVar(name=self.name, value=self.value)

EphemeralVolume

Source code in hera/workflows/volume.py
class EphemeralVolume(_BaseVolume, _ModelEphemeralVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name, ephemeral=_ModelEphemeralVolumeSource(volume_claim_template=self.volume_claim_template)
        )

ExistingVolume

Source code in hera/workflows/volume.py
class ExistingVolume(_BaseVolume, _ModelPersistentVolumeClaimVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            persistent_volume_claim=_ModelPersistentVolumeClaimVolumeSource(
                claim_name=self.claim_name, read_only=self.read_only
            ),
        )

FCVolume

Source code in hera/workflows/volume.py
class FCVolume(_BaseVolume, _ModelFCVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            fc=_ModelFCVolumeSource(
                fs_type=self.fs_type,
                lun=self.lun,
                read_only=self.read_only,
                target_ww_ns=self.target_ww_ns,
                wwids=self.wwids,
            ),
        )

FieldEnv

Source code in hera/workflows/env.py
class FieldEnv(_BaseEnv):
    field_path: str
    api_version: Optional[str] = None

    @validator("api_version")
    @classmethod
    def _check_api_version(cls, v):
        if v is None:
            return global_config.api_version
        return v

    def build(self) -> _ModelEnvVar:
        """Constructs and returns the Argo environment specification"""
        return _ModelEnvVar(
            name=self.name,
            value_from=_ModelEnvVarSource(
                field_ref=_ModelObjectFieldSelector(
                    field_path=self.field_path,
                    api_version=self.api_version,
                )
            ),
        )

api_version class-attribute

api_version: Optional[str] = None

field_path class-attribute

field_path: str

build

build()

Constructs and returns the Argo environment specification

Source code in hera/workflows/env.py
def build(self) -> _ModelEnvVar:
    """Constructs and returns the Argo environment specification"""
    return _ModelEnvVar(
        name=self.name,
        value_from=_ModelEnvVarSource(
            field_ref=_ModelObjectFieldSelector(
                field_path=self.field_path,
                api_version=self.api_version,
            )
        ),
    )

FlexVolume

Source code in hera/workflows/volume.py
class FlexVolume(_BaseVolume, _ModelFlexVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            flex_volume=_ModelFlexVolumeSource(
                driver=self.driver,
                fs_type=self.fs_type,
                options=self.options,
                read_only=self.read_only,
                secret_ref=self.secret_ref,
            ),
        )

FlockerVolume

Source code in hera/workflows/volume.py
class FlockerVolume(_BaseVolume, _ModelFlockerVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            flocker=_ModelFlockerVolumeSource(dataset_name=self.dataset_name, dataset_uuid=self.dataset_uuid),
        )

GCEPersistentDiskVolume

Source code in hera/workflows/volume.py
class GCEPersistentDiskVolume(_BaseVolume, _ModelGCEPersistentDiskVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            gce_persistent_disk=_ModelGCEPersistentDiskVolumeSource(
                fs_type=self.fs_type, partition=self.partition, pd_name=self.pd_name, read_only=self.read_only
            ),
        )

GCSArtifact

Source code in hera/workflows/artifact.py
class GCSArtifact(_ModelGCSArtifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.gcs = _ModelGCSArtifact(
            bucket=self.bucket,
            key=self.key,
            service_account_key_secret=self.service_account_key_secret,
        )
        return artifact

GitArtifact

Source code in hera/workflows/artifact.py
class GitArtifact(_ModelGitArtifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.git = _ModelGitArtifact(
            branch=self.branch,
            depth=self.depth,
            disable_submodules=self.disable_submodules,
            fetch=self.fetch,
            insecure_ignore_host_key=self.insecure_ignore_host_key,
            password_secret=self.password_secret,
            repo=self.repo,
            revision=self.revision,
            single_branch=self.single_branch,
            ssh_private_key_secret=self.ssh_private_key_secret,
            username_secret=self.username_secret,
        )
        return artifact

GitRepoVolume

Source code in hera/workflows/volume.py
class GitRepoVolume(_BaseVolume, _ModelGitRepoVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            git_repo=_ModelGitRepoVolumeSource(
                directory=self.directory, repository=self.repository, revision=self.revision
            ),
        )

GlusterfsVolume

Source code in hera/workflows/volume.py
class GlusterfsVolume(_BaseVolume, _ModelGlusterfsVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            glusterfs=_ModelGlusterfsVolumeSource(endpoints=self.endpoints, path=self.path, read_only=self.read_only),
        )

HDFSArtifact

Source code in hera/workflows/artifact.py
class HDFSArtifact(Artifact):
    # note that `HDFSArtifact` does not inherit from the auto-generated `HDFSArtifact` because there's a
    # conflict in `path` with the base class `Artifact`. Here, we redefine the HDFS `path` to `hdfs_path` to
    # differentiate between the parent class and the child class `path`
    hdfs_path: str
    addresses: Optional[List[str]] = None
    force: Optional[bool] = None
    hdfs_user: Optional[str]
    krb_c_cache_secret: Optional[SecretKeySelector] = None
    krb_config_config_map: Optional[SecretKeySelector] = None
    krb_keytab_secret: Optional[SecretKeySelector] = None
    krb_realm: Optional[str] = None
    krb_service_principal_name: Optional[str] = None
    krb_username: Optional[str] = None

    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.hdfs = _ModelHDFSArtifact(
            addresses=self.addresses,
            force=self.force,
            hdfs_user=self.hdfs_user,
            krb_c_cache_secret=self.krb_c_cache_secret,
            krb_config_config_map=self.krb_config_config_map,
            krb_keytab_secret=self.krb_keytab_secret,
            krb_realm=self.krb_realm,
            krb_service_principal_name=self.krb_service_principal_name,
            krb_username=self.krb_username,
            path=self.hdfs_path,
        )
        return artifact

addresses class-attribute

addresses: Optional[List[str]] = None

force class-attribute

force: Optional[bool] = None

hdfs_path class-attribute

hdfs_path: str

hdfs_user class-attribute

hdfs_user: Optional[str]

krb_c_cache_secret class-attribute

krb_c_cache_secret: Optional[SecretKeySelector] = None

krb_config_config_map class-attribute

krb_config_config_map: Optional[SecretKeySelector] = None

krb_keytab_secret class-attribute

krb_keytab_secret: Optional[SecretKeySelector] = None

krb_realm class-attribute

krb_realm: Optional[str] = None

krb_service_principal_name class-attribute

krb_service_principal_name: Optional[str] = None

krb_username class-attribute

krb_username: Optional[str] = None

HTTP

Source code in hera/workflows/http_template.py
class HTTP(TemplateMixin, IOMixin):
    url: str
    body: Optional[str] = None
    body_from: Optional[HTTPBodySource] = None
    headers: Optional[List[HTTPHeader]] = None
    insecure_skip_verify: Optional[bool] = None
    method: Optional[str] = None
    success_condition: Optional[str] = None
    timeout_seconds: Optional[int] = None

    def _build_http_template(self) -> _ModelHTTP:
        return _ModelHTTP(
            url=self.url,
            body=self.body,
            body_from=self.body_from,
            headers=self.headers,
            insecure_skip_verify=self.insecure_skip_verify,
            method=self.method,
            success_condition=self.success_condition,
            timeout_seconds=self.timeout_seconds,
        )

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            http=self._build_http_template(),
            init_containers=self.init_containers,
            memoize=self.memoize,
            metadata=self._build_metadata(),
            inputs=self._build_inputs(),
            outputs=self._build_outputs(),
            name=self.name,
            node_selector=self.node_selector,
            plugin=self.plugin,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

body class-attribute

body: Optional[str] = None

body_from class-attribute

body_from: Optional[HTTPBodySource] = None

headers class-attribute

headers: Optional[List[HTTPHeader]] = None

insecure_skip_verify class-attribute

insecure_skip_verify: Optional[bool] = None

method class-attribute

method: Optional[str] = None

success_condition class-attribute

success_condition: Optional[str] = None

timeout_seconds class-attribute

timeout_seconds: Optional[int] = None

url class-attribute

url: str

HTTPArtifact

Source code in hera/workflows/artifact.py
class HTTPArtifact(_ModelHTTPArtifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.http = _ModelHTTPArtifact(
            auth=self.auth,
            headers=self.headers,
            url=self.url,
        )
        return artifact

HostPathVolume

Source code in hera/workflows/volume.py
class HostPathVolume(_BaseVolume, _ModelHostPathVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(name=self.name, host_path=_ModelHostPathVolumeSource(path=self.path, type=self.type))

ISCSIVolume

Source code in hera/workflows/volume.py
class ISCSIVolume(_BaseVolume, _ModelISCSIVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            iscsi=_ModelISCSIVolumeSource(
                chap_auth_discovery=self.chap_auth_discovery,
                chap_auth_session=self.chap_auth_discovery,
                fs_type=self.fs_type,
                initiator_name=self.initiator_name,
                iqn=self.iqn,
                iscsi_interface=self.iscsi_interface,
                lun=self.lun,
                portals=self.portals,
                read_only=self.read_only,
                secret_ref=self.secret_ref,
                target_portal=self.target_portal,
            ),
        )

InvalidDispatchType

Source code in hera/workflows/exceptions.py
class InvalidDispatchType(Exception):
    ...

InvalidTemplateCall

Source code in hera/workflows/exceptions.py
class InvalidTemplateCall(Exception):
    ...

InvalidType

Source code in hera/workflows/exceptions.py
class InvalidType(Exception):
    ...

NFSVolume

Source code in hera/workflows/volume.py
class NFSVolume(_BaseVolume, _ModelNFSVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(name=self.name, nfs=_ModelNFSVolumeSource(path=self.path, read_only=self.read_only))

NoneArchiveStrategy

Source code in hera/workflows/archive.py
class NoneArchiveStrategy(ArchiveStrategy):
    def _build_archive_strategy(self) -> _ModelArchiveStrategy:
        return _ModelArchiveStrategy(none=_ModelNoneStrategy())

OSSArtifact

Source code in hera/workflows/artifact.py
class OSSArtifact(_ModelOSSArtifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.oss = _ModelOSSArtifact(
            access_key_secret=self.access_key_secret,
            bucket=self.bucket,
            create_bucket_if_not_present=self.create_bucket_if_not_present,
            endpoint=self.endpoint,
            key=self.key,
            lifecycle_rule=self.lifecycle_rule,
            secret_key_secret=self.secret_key_secret,
            security_token=self.security_token,
        )
        return artifact

Operator

Operator is a representation of mathematical comparison symbols. This can be used on tasks that execute conditionally based on the output of another task. Notes


The task that outputs its result needs to do so using stdout. See examples for a sample workflow.

Source code in hera/workflows/operator.py
class Operator(Enum):
    """Operator is a representation of mathematical comparison symbols.
     This can be used on tasks that execute conditionally based on the output of another task.
    Notes
    -----
    The task that outputs its result needs to do so using stdout. See `examples` for a sample workflow.
    """

    does_not_exist = "DoesNotExist"
    exists = "Exists"
    gt = "Gt"
    in_ = "In"
    lt = "Lt"
    not_in = "NotIn"

    equals = "=="
    greater = ">"
    less = "<"
    greater_equal = ">="
    less_equal = "<="
    not_equal = "!="
    or_ = "||"
    and_ = "&&"
    starts_with = "=~"

    def __str__(self):
        return str(self.value)

and_ class-attribute

and_ = '&&'

does_not_exist class-attribute

does_not_exist = 'DoesNotExist'

equals class-attribute

equals = '=='

exists class-attribute

exists = 'Exists'

greater class-attribute

greater = '>'

greater_equal class-attribute

greater_equal = '>='

gt class-attribute

gt = 'Gt'

in_ class-attribute

in_ = 'In'

less class-attribute

less = '<'

less_equal class-attribute

less_equal = '<='

lt class-attribute

lt = 'Lt'

not_equal class-attribute

not_equal = '!='

not_in class-attribute

not_in = 'NotIn'

or_ class-attribute

or_ = '||'

starts_with class-attribute

starts_with = '=~'

Parallel

Parallel is used to add a list of steps which will run in parallel.

Parallel implements the contextmanager interface so allows usage of with, under which any hera.workflows.steps.Step objects instantiated will be added to Parallel’s list of sub_steps.

Source code in hera/workflows/steps.py
class Parallel(
    ContextMixin,
    SubNodeMixin,
):
    """Parallel is used to add a list of steps which will run in parallel.

    Parallel implements the contextmanager interface so allows usage of `with`, under which any
    `hera.workflows.steps.Step` objects instantiated will be added to Parallel's list of sub_steps.
    """

    sub_steps: List[Union[Step, _ModelWorkflowStep]] = []

    def _add_sub(self, node: Any):
        if not isinstance(node, Step):
            raise InvalidType(type(node))
        self.sub_steps.append(node)

    def _build_step(self) -> List[_ModelWorkflowStep]:
        steps = []
        for step in self.sub_steps:
            if isinstance(step, Step):
                steps.append(step._build_as_workflow_step())
            elif isinstance(step, _ModelWorkflowStep):
                steps.append(step)
            else:
                raise InvalidType(type(step))
        return steps

sub_steps class-attribute

sub_steps: List[Union[Step, _ModelWorkflowStep]] = []

Parameter

A Parameter is used to pass values in and out of templates.

They are to declare input and output parameters in the case of templates, and are used for Steps and Tasks to assign values.

Source code in hera/workflows/parameter.py
class Parameter(_ModelParameter):
    """A `Parameter` is used to pass values in and out of templates.

    They are to declare input and output parameters in the case of templates, and are used
    for Steps and Tasks to assign values.
    """

    # `MISSING` is the default value so that `Parameter` serialization understands the difference between a
    # missing value and a value of `None`, as set by a user. With this, when something sets a value of `None` it is
    # taken as a proper `None`. By comparison, if a user does not set a value, it is taken as `MISSING` and therefore
    # not serialized. This happens because the values if turned into an _actual_ `None` by `serialize` and therefore
    # Pydantic will not include it in the YAML that is passed to Argo
    value: Optional[Any] = MISSING
    default: Optional[Any] = MISSING

    @root_validator(pre=True, allow_reuse=True)
    def _check_values(cls, values):
        if values.get("value") is not None and values.get("value_from") is not None:
            raise ValueError("Cannot specify both `value` and `value_from` when instantiating `Parameter`")

        values["value"] = serialize(values.get("value", MISSING))
        values["default"] = serialize(values.get("default", MISSING))

        return values

    def __str__(self):
        """Represent the parameter as a string by pointing to its value.

        This is useful in situations where we want to concatenate string values such as
        Task.args=["echo", wf.get_parameter("message")].
        """
        if self.value is None:
            raise ValueError("Cannot represent `Parameter` as string as `value` is not set")
        return self.value

    def with_name(self, name: str) -> Parameter:
        """Returns a copy of the parameter with the name set to the value"""
        p = self.copy(deep=True)
        p.name = name
        return p

    def as_input(self) -> _ModelParameter:
        """Assembles the parameter for use as an input of a template"""
        return _ModelParameter(
            name=self.name,
            description=self.description,
            default=self.default,
            enum=self.enum,
            value=self.value,
            value_from=self.value_from,
        )

    def as_argument(self) -> _ModelParameter:
        """Assembles the parameter for use as an argument of a step or a task"""
        # Setting a default value when used as an argument is a no-op so we exclude it as it would get overwritten by
        # `value` or `value_from` (one of which is required)
        # Overwrite ref: https://github.com/argoproj/argo-workflows/blob/781675ddcf6f1138d697cb9c71dae484daa0548b/workflow/common/util.go#L126-L139
        # One of value/value_from required ref: https://github.com/argoproj/argo-workflows/blob/ab178bb0b36a5ce34b4c1302cf4855879a0e8cf5/workflow/validate/validate.go#L794-L798
        return _ModelParameter(
            name=self.name,
            global_name=self.global_name,
            description=self.description,
            enum=self.enum,
            value=self.value,
            value_from=self.value_from,
        )

    def as_output(self) -> _ModelParameter:
        """Assembles the parameter for use as an output of a template"""
        # Only `value` and `value_from` are valid here
        # see https://github.com/argoproj/argo-workflows/blob/e3254eca115c9dd358e55d16c6a3d41403c29cae/workflow/validate/validate.go#L1067
        return _ModelParameter(
            name=self.name,
            global_name=self.global_name,
            description=self.description,
            value=self.value,
            value_from=self.value_from,
        )

default class-attribute

default: Optional[Any] = MISSING

value class-attribute

value: Optional[Any] = MISSING

as_argument

as_argument()

Assembles the parameter for use as an argument of a step or a task

Source code in hera/workflows/parameter.py
def as_argument(self) -> _ModelParameter:
    """Assembles the parameter for use as an argument of a step or a task"""
    # Setting a default value when used as an argument is a no-op so we exclude it as it would get overwritten by
    # `value` or `value_from` (one of which is required)
    # Overwrite ref: https://github.com/argoproj/argo-workflows/blob/781675ddcf6f1138d697cb9c71dae484daa0548b/workflow/common/util.go#L126-L139
    # One of value/value_from required ref: https://github.com/argoproj/argo-workflows/blob/ab178bb0b36a5ce34b4c1302cf4855879a0e8cf5/workflow/validate/validate.go#L794-L798
    return _ModelParameter(
        name=self.name,
        global_name=self.global_name,
        description=self.description,
        enum=self.enum,
        value=self.value,
        value_from=self.value_from,
    )

as_input

as_input()

Assembles the parameter for use as an input of a template

Source code in hera/workflows/parameter.py
def as_input(self) -> _ModelParameter:
    """Assembles the parameter for use as an input of a template"""
    return _ModelParameter(
        name=self.name,
        description=self.description,
        default=self.default,
        enum=self.enum,
        value=self.value,
        value_from=self.value_from,
    )

as_output

as_output()

Assembles the parameter for use as an output of a template

Source code in hera/workflows/parameter.py
def as_output(self) -> _ModelParameter:
    """Assembles the parameter for use as an output of a template"""
    # Only `value` and `value_from` are valid here
    # see https://github.com/argoproj/argo-workflows/blob/e3254eca115c9dd358e55d16c6a3d41403c29cae/workflow/validate/validate.go#L1067
    return _ModelParameter(
        name=self.name,
        global_name=self.global_name,
        description=self.description,
        value=self.value,
        value_from=self.value_from,
    )

with_name

with_name(name)

Returns a copy of the parameter with the name set to the value

Source code in hera/workflows/parameter.py
def with_name(self, name: str) -> Parameter:
    """Returns a copy of the parameter with the name set to the value"""
    p = self.copy(deep=True)
    p.name = name
    return p

PhotonPersistentDiskVolume

Source code in hera/workflows/volume.py
class PhotonPersistentDiskVolume(_BaseVolume, _ModelPhotonPersistentDiskVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            photon_persistent_disk=_ModelPhotonPersistentDiskVolumeSource(fs_type=self.fs_type, pd_id=self.pd_id),
        )

PortworxVolume

Source code in hera/workflows/volume.py
class PortworxVolume(_BaseVolume, _ModelPortworxVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            portworx_volume=_ModelPortworxVolumeSource(
                fs_type=self.fs_type, read_only=self.read_only, volume_id=self.volume_id
            ),
        )

ProjectedVolume

Source code in hera/workflows/volume.py
class ProjectedVolume(_BaseVolume, _ModelProjectedVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name, projected=_ModelProjectedVolumeSource(default_mode=self.default_mode, sources=self.sources)
        )

QuobyteVolume

Source code in hera/workflows/volume.py
class QuobyteVolume(_BaseVolume, _ModelQuobyteVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            quobyte=_ModelQuobyteVolumeSource(
                group=self.group,
                read_only=self.read_only,
                registry=self.registry,
                tenant=self.tenant,
                user=self.user,
                volume=self.volume,
            ),
        )

RBDVolume

Source code in hera/workflows/volume.py
class RBDVolume(_BaseVolume, _ModelRBDVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            rbd=_ModelRBDVolumeSource(
                fs_type=self.fs_type,
                image=self.image,
                keyring=self.keyring,
                monitors=self.monitors,
                pool=self.pool,
                read_only=self.read_only,
                secret_ref=self.secret_ref,
                user=self.user,
            ),
        )

RawArtifact

Source code in hera/workflows/artifact.py
class RawArtifact(_ModelRawArtifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.raw = _ModelRawArtifact(data=self.data)
        return artifact

Resource

Source code in hera/workflows/resource.py
class Resource(TemplateMixin, SubNodeMixin, IOMixin):
    action: str
    failure_condition: Optional[str] = None
    flags: Optional[List[str]] = None
    manifest: Optional[str] = None
    manifest_from: Optional[ManifestFrom] = None
    merge_strategy: Optional[str] = None
    set_owner_reference: Optional[bool] = None
    success_condition: Optional[str] = None

    def _build_resource_template(self) -> _ModelResourceTemplate:
        return _ModelResourceTemplate(
            action=self.action,
            failure_condition=self.failure_condition,
            flags=self.flags,
            manifest=self.manifest,
            manifest_from=self.manifest_from,
            merge_strategy=self.merge_strategy,
            set_owner_reference=self.set_owner_reference,
            success_condition=self.success_condition,
        )

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            daemon=self.daemon,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self.metrics,
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            parallelism=self.parallelism,
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            resource=self._build_resource_template(),
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

action class-attribute

action: str

failure_condition class-attribute

failure_condition: Optional[str] = None

flags class-attribute

flags: Optional[List[str]] = None

manifest class-attribute

manifest: Optional[str] = None

manifest_from class-attribute

manifest_from: Optional[ManifestFrom] = None

merge_strategy class-attribute

merge_strategy: Optional[str] = None

set_owner_reference class-attribute

set_owner_reference: Optional[bool] = None

success_condition class-attribute

success_condition: Optional[str] = None

ResourceEnv

Source code in hera/workflows/env.py
class ResourceEnv(_BaseEnv):
    resource: str
    container_name: Optional[str] = None
    divisor: Optional[Quantity] = None

    def build(self) -> _ModelEnvVar:
        return _ModelEnvVar(
            name=self.name,
            value_from=_ModelEnvVarSource(
                resource_field_ref=_ModelResourceFieldSelector(
                    container_name=self.container_name,
                    divisor=self.divisor,
                    resource=self.resource,
                )
            ),
        )

container_name class-attribute

container_name: Optional[str] = None

divisor class-attribute

divisor: Optional[Quantity] = None

resource class-attribute

resource: str

build

build()
Source code in hera/workflows/env.py
def build(self) -> _ModelEnvVar:
    return _ModelEnvVar(
        name=self.name,
        value_from=_ModelEnvVarSource(
            resource_field_ref=_ModelResourceFieldSelector(
                container_name=self.container_name,
                divisor=self.divisor,
                resource=self.resource,
            )
        ),
    )

Resources

A representation of a collection of resources that are requested to be consumed by a task for execution.

This follow the K8S definition for resources.

Parameters

Optional[Union[float, int, str]] = None

The number of CPUs to request, either as a fraction (millicpu), whole number, or a string.

Optional[Union[int, str]] = None

The limit of CPUs to request, either as a fraction (millicpu), whole number, or a string.

Optional[str] = None

The amount of memory to request.

Optional[str] = None

The memory limit of the pod.

Optional[str] = None

The amount of ephemeral storage to request.

Optional[str] = None

The emphemeral storage limit of the pod.

Optional[int] = None

The number of GPUs to request.

Optional[str] = “nvidia.com/gpu”

The GPU flag to use for identifying how many GPUs to mount to a pod. This is dependent on the cloud provider.

Optional[Dict] = None

Any custom resources to request. This is dependent on the cloud provider.

Source code in hera/workflows/resources.py
class Resources(_BaseModel):
    """A representation of a collection of resources that are requested to be consumed by a task for execution.

    This follow the K8S definition for resources.

    Parameters
    ----------
    cpu_request: Optional[Union[float, int, str]] = None
        The number of CPUs to request, either as a fraction (millicpu), whole number, or a string.
    cpu_limit: Optional[Union[int, str]] = None
        The limit of CPUs to request, either as a fraction (millicpu), whole number, or a string.
    memory_request: Optional[str] = None
        The amount of memory to request.
    memory_limit: Optional[str] = None
        The memory limit of the pod.
    ephemeral_request: Optional[str] = None
        The amount of ephemeral storage to request.
    ephemeral_limit: Optional[str] = None
        The emphemeral storage limit of the pod.
    gpus: Optional[int] = None
        The number of GPUs to request.
    gpu_flag: Optional[str] = "nvidia.com/gpu"
        The GPU flag to use for identifying how many GPUs to mount to a pod. This is dependent on the cloud provider.
    custom_resources: Optional[Dict] = None
        Any custom resources to request. This is dependent on the cloud provider.
    """

    cpu_request: Optional[Union[float, int, str]] = None
    cpu_limit: Optional[Union[float, int, str]] = None
    memory_request: Optional[str] = None
    memory_limit: Optional[str] = None
    ephemeral_request: Optional[str] = None
    ephemeral_limit: Optional[str] = None
    gpus: Optional[int] = None
    gpu_flag: Optional[str] = "nvidia.com/gpu"
    custom_resources: Optional[Dict] = None

    @root_validator(pre=True)
    def _check_specs(cls, values):
        cpu_request: Optional[Union[float, int, str]] = values.get("cpu_request")
        cpu_limit: Optional[Union[float, int, str]] = values.get("cpu_limit")
        memory_request: Optional[str] = values.get("memory_request")
        memory_limit: Optional[str] = values.get("memory_limit")
        ephemeral_request: Optional[str] = values.get("ephemeral_request")
        ephemeral_limit: Optional[str] = values.get("ephemeral_limit")

        if memory_request is not None:
            validate_storage_units(memory_request)
        if memory_limit is not None:
            validate_storage_units(memory_limit)

        if ephemeral_request is not None:
            validate_storage_units(ephemeral_request)
        if ephemeral_limit:
            validate_storage_units(ephemeral_limit)

        # TODO: add validation for CPU units if str
        if cpu_limit is not None and isinstance(cpu_limit, int):
            assert cpu_limit >= 0, "CPU limit must be positive"
        if cpu_request is not None and isinstance(cpu_request, int):
            assert cpu_request >= 0, "CPU request must be positive"
            if cpu_limit is not None and isinstance(cpu_limit, int):
                assert cpu_request <= cpu_limit, "CPU request must be smaller or equal to limit"

        return values

    def build(self) -> _ModelResourceRequirements:
        """Builds the resource requirements of the pod"""
        resources: Dict = dict()

        if self.cpu_limit is not None:
            resources = _merge_dicts(resources, dict(limits=dict(cpu=str(self.cpu_limit))))

        if self.cpu_request is not None:
            resources = _merge_dicts(resources, dict(requests=dict(cpu=str(self.cpu_request))))

        if self.memory_limit is not None:
            resources = _merge_dicts(resources, dict(limits=dict(memory=self.memory_limit)))

        if self.memory_request is not None:
            resources = _merge_dicts(resources, dict(requests=dict(memory=self.memory_request)))

        if self.ephemeral_limit is not None:
            resources = _merge_dicts(resources, dict(limits={"ephemeral-storage": self.ephemeral_limit}))

        if self.ephemeral_request is not None:
            resources = _merge_dicts(resources, dict(requests={"ephemeral-storage": self.ephemeral_request}))

        if self.gpus is not None:
            resources = _merge_dicts(resources, dict(requests={self.gpu_flag: str(self.gpus)}))
            resources = _merge_dicts(resources, dict(limits={self.gpu_flag: str(self.gpus)}))

        if self.custom_resources:
            resources = _merge_dicts(resources, self.custom_resources)

        return _ModelResourceRequirements(**resources)

cpu_limit class-attribute

cpu_limit: Optional[Union[float, int, str]] = None

cpu_request class-attribute

cpu_request: Optional[Union[float, int, str]] = None

custom_resources class-attribute

custom_resources: Optional[Dict] = None

ephemeral_limit class-attribute

ephemeral_limit: Optional[str] = None

ephemeral_request class-attribute

ephemeral_request: Optional[str] = None

gpu_flag class-attribute

gpu_flag: Optional[str] = 'nvidia.com/gpu'

gpus class-attribute

gpus: Optional[int] = None

memory_limit class-attribute

memory_limit: Optional[str] = None

memory_request class-attribute

memory_request: Optional[str] = None

build

build()

Builds the resource requirements of the pod

Source code in hera/workflows/resources.py
def build(self) -> _ModelResourceRequirements:
    """Builds the resource requirements of the pod"""
    resources: Dict = dict()

    if self.cpu_limit is not None:
        resources = _merge_dicts(resources, dict(limits=dict(cpu=str(self.cpu_limit))))

    if self.cpu_request is not None:
        resources = _merge_dicts(resources, dict(requests=dict(cpu=str(self.cpu_request))))

    if self.memory_limit is not None:
        resources = _merge_dicts(resources, dict(limits=dict(memory=self.memory_limit)))

    if self.memory_request is not None:
        resources = _merge_dicts(resources, dict(requests=dict(memory=self.memory_request)))

    if self.ephemeral_limit is not None:
        resources = _merge_dicts(resources, dict(limits={"ephemeral-storage": self.ephemeral_limit}))

    if self.ephemeral_request is not None:
        resources = _merge_dicts(resources, dict(requests={"ephemeral-storage": self.ephemeral_request}))

    if self.gpus is not None:
        resources = _merge_dicts(resources, dict(requests={self.gpu_flag: str(self.gpus)}))
        resources = _merge_dicts(resources, dict(limits={self.gpu_flag: str(self.gpus)}))

    if self.custom_resources:
        resources = _merge_dicts(resources, self.custom_resources)

    return _ModelResourceRequirements(**resources)

RetryPolicy

Source code in hera/workflows/retry_strategy.py
class RetryPolicy(Enum):
    always = "Always"
    """Retry all failed steps"""

    on_failure = "OnFailure"
    """Retry steps whose main container is marked as failed in Kubernetes"""

    on_error = "OnError"
    """Retry steps that encounter Argo controller errors, or whose init or wait containers fail"""

    on_transient_error = "OnTransientError"
    """Retry steps that encounter errors defined as transient, or errors matching the `TRANSIENT_ERROR_PATTERN`
    environment variable.
    Available in version 3.0 and later.
    """

    def __str__(self):
        return str(self.value)

always class-attribute

always = 'Always'

Retry all failed steps

on_error class-attribute

on_error = 'OnError'

Retry steps that encounter Argo controller errors, or whose init or wait containers fail

on_failure class-attribute

on_failure = 'OnFailure'

Retry steps whose main container is marked as failed in Kubernetes

on_transient_error class-attribute

on_transient_error = 'OnTransientError'

Retry steps that encounter errors defined as transient, or errors matching the TRANSIENT_ERROR_PATTERN environment variable. Available in version 3.0 and later.

RetryStrategy

Source code in hera/workflows/retry_strategy.py
class RetryStrategy(_BaseModel):
    affinity: Optional[RetryAffinity] = None
    backoff: Optional[Backoff] = None
    expression: Optional[str] = None
    limit: Optional[Union[int, str]] = None
    retry_policy: Optional[Union[str, RetryPolicy]] = None

    @validator("retry_policy", pre=True)
    def _convert_retry_policy(cls, v):
        if v is None or isinstance(v, str):
            return v

        v = cast(RetryPolicy, v)
        return v.value

    @validator("limit", pre=True)
    def _convert_limit(cls, v):
        if v is None or isinstance(v, IntOrString):
            return v

        return IntOrString(__root__=str(v))  # int or str

    def build(self) -> _ModelRetryStrategy:
        return _ModelRetryStrategy(
            affinity=self.affinity,
            backoff=self.backoff,
            expression=self.expression,
            limit=self.limit,
            retry_policy=self.retry_policy,
        )

affinity class-attribute

affinity: Optional[RetryAffinity] = None

backoff class-attribute

backoff: Optional[Backoff] = None

expression class-attribute

expression: Optional[str] = None

limit class-attribute

limit: Optional[Union[int, str]] = None

retry_policy class-attribute

retry_policy: Optional[Union[str, RetryPolicy]] = None

build

build()
Source code in hera/workflows/retry_strategy.py
def build(self) -> _ModelRetryStrategy:
    return _ModelRetryStrategy(
        affinity=self.affinity,
        backoff=self.backoff,
        expression=self.expression,
        limit=self.limit,
        retry_policy=self.retry_policy,
    )

S3Artifact

Source code in hera/workflows/artifact.py
class S3Artifact(_ModelS3Artifact, Artifact):
    def _build_artifact(self) -> _ModelArtifact:
        artifact = super()._build_artifact()
        artifact.s3 = _ModelS3Artifact(
            access_key_secret=self.access_key_secret,
            bucket=self.bucket,
            create_bucket_if_not_present=self.create_bucket_if_not_present,
            encryption_options=self.encryption_options,
            endpoint=self.endpoint,
            insecure=self.insecure,
            key=self.key,
            region=self.region,
            role_arn=self.role_arn,
            secret_key_secret=self.secret_key_secret,
            use_sdk_creds=self.use_sdk_creds,
        )
        return artifact

ScaleIOVolume

Source code in hera/workflows/volume.py
class ScaleIOVolume(_BaseVolume, _ModelScaleIOVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            scale_io=_ModelScaleIOVolumeSource(
                fs_type=self.fs_type,
                gateway=self.gateway,
                protection_domain=self.protection_domain,
                read_only=self.read_only,
                secret_ref=self.secret_ref,
                ssl_enabled=self.ssl_enabled,
                storage_mode=self.storage_mode,
                storage_pool=self.storage_pool,
                system=self.system,
                volume_name=self.volume_name,
            ),
        )

Script

A Script acts as a wrapper around a container. In Hera this defaults to a “python:3.7” image specified by global_config.image, which runs a python source specified by Script.source.

Source code in hera/workflows/script.py
class Script(
    EnvIOMixin,
    CallableTemplateMixin,
    ContainerMixin,
    TemplateMixin,
    ResourceMixin,
    VolumeMountMixin,
):
    """A Script acts as a wrapper around a container. In Hera this defaults to a "python:3.7" image
    specified by global_config.image, which runs a python source specified by `Script.source`.
    """

    container_name: Optional[str] = None
    args: Optional[List[str]] = None
    command: Optional[List[str]] = global_config.script_command
    lifecycle: Optional[Lifecycle] = None
    security_context: Optional[SecurityContext] = None
    source: Optional[Union[Callable, str]] = None
    working_dir: Optional[str] = None
    add_cwd_to_sys_path: bool = True

    def _get_param_script_portion(self) -> str:
        """Constructs and returns a script that loads the parameters of the specified arguments. Since Argo passes
        parameters through {{input.parameters.name}} it can be very cumbersome for users to manage that. This creates a
        script that automatically imports json and loads/adds code to interpret each independent argument into the
        script.

        Returns
        -------
        str
            The string representation of the script to load.
        """
        inputs = self._build_inputs()
        assert inputs
        extract = "import json\n"
        for param in sorted(inputs.parameters or [], key=lambda x: x.name):
            # Hera does not know what the content of the `InputFrom` is, coming from another task. In some cases
            # non-JSON encoded strings are returned, which fail the loads, but they can be used as plain strings
            # which is why this captures that in an except. This is only used for `InputFrom` cases as the extra
            # payload of the script is not necessary when regular input is set on the task via `func_params`
            extract += f"""try: {param.name} = json.loads(r'''{{{{inputs.parameters.{param.name}}}}}''')\n"""
            extract += f"""except: {param.name} = r'''{{{{inputs.parameters.{param.name}}}}}'''\n"""
        return textwrap.dedent(extract)

    def _build_source(self) -> str:
        """Assembles and returns a script representation of the given function, along with the extra script material
        prefixed to the string. The script is expected to be a callable function the client is interested in submitting
        for execution on Argo and the script_extra material represents the parameter loading part obtained, likely,
        through get_param_script_portion.

        Returns
        -------
        str
            Final formatted script.
        """
        if callable(self.source):
            signature = inspect.signature(self.source)
            args = inspect.getfullargspec(self.source).args
            if signature.return_annotation == str:
                # Resolve function by filling in templated inputs
                input_params_names = [p.name for p in self.inputs if isinstance(p, Parameter)]  # type: ignore
                missing_args = set(args) - set(input_params_names)
                if missing_args:
                    raise ValueError(f"Missing inputs for source args: {missing_args}")
                kwargs = {name: f"{{{{inputs.parameters.{name}}}}}" for name in args}
                # Resolve the function to a string
                return self.source(**kwargs)
            else:
                script = ""
                # Argo will save the script as a file and run it with cmd:
                # - python /argo/staging/script
                # However, this prevents the script from importing modules in its cwd,
                # since it's looking for files relative to the script path.
                # We fix this by appending the cwd path to sys:
                if self.add_cwd_to_sys_path:
                    script = "import os\nimport sys\nsys.path.append(os.getcwd())\n"

                script_extra = self._get_param_script_portion() if args else None
                if script_extra:
                    script += copy.deepcopy(script_extra)
                    script += "\n"

                # content represents the function components, separated by new lines
                # therefore, the actual code block occurs after the end parenthesis, which is a literal `):\n`
                content = inspect.getsourcelines(self.source)[0]
                token_index, start_token = 1, ":\n"
                for curr_index, curr_token in enumerate(content):
                    if start_token in curr_token:
                        # when we find the curr token we find the end of the function header. The next index is the
                        # starting point of the function body
                        token_index = curr_index + 1
                        break

                s = "".join(content[token_index:])
                script += textwrap.dedent(s)
                return textwrap.dedent(script)
        else:
            assert isinstance(self.source, str)
            return self.source

    def _build_inputs(self) -> Optional[ModelInputs]:
        inputs = super()._build_inputs()
        func_parameters = _get_parameters_from_callable(self.source) if callable(self.source) else None

        if inputs is None and func_parameters is None:
            return None
        elif func_parameters is None:
            return inputs
        elif inputs is None:
            inputs = ModelInputs(parameters=func_parameters)

        already_set_params = {p.name for p in inputs.parameters or []}
        for param in func_parameters:
            if param.name not in already_set_params:
                inputs.parameters = [param] if inputs.parameters is None else inputs.parameters + [param]
        return inputs

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            daemon=self.daemon,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self.metrics,
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            parallelism=self.parallelism,
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            resource=self._build_resources(),
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            script=self._build_script(),
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
            volumes=self._build_volumes(),
        )

    def _build_script(self) -> _ModelScriptTemplate:
        return _ModelScriptTemplate(
            args=self.args,
            command=self.command,
            env=self._build_env(),
            env_from=self._build_env_from(),
            image=self.image,
            image_pull_policy=self._build_image_pull_policy(),
            lifecycle=self.lifecycle,
            liveness_probe=self.liveness_probe,
            name=self.container_name,
            ports=self.ports,
            readiness_probe=self.readiness_probe,
            resources=self._build_resources(),
            security_context=self.security_context,
            source=self._build_source(),
            startup_probe=self.startup_probe,
            stdin=self.stdin,
            stdin_once=self.stdin_once,
            termination_message_path=self.termination_message_path,
            termination_message_policy=self.termination_message_policy,
            tty=self.tty,
            volume_devices=self.volume_devices,
            volume_mounts=self._build_volume_mounts(),
            working_dir=self.working_dir,
        )

add_cwd_to_sys_path class-attribute

add_cwd_to_sys_path: bool = True

args class-attribute

args: Optional[List[str]] = None

command class-attribute

command: Optional[List[str]] = global_config.script_command

container_name class-attribute

container_name: Optional[str] = None

lifecycle class-attribute

lifecycle: Optional[Lifecycle] = None

security_context class-attribute

security_context: Optional[SecurityContext] = None

source class-attribute

source: Optional[Union[Callable, str]] = None

working_dir class-attribute

working_dir: Optional[str] = None

SecretEnv

Source code in hera/workflows/env.py
class SecretEnv(_BaseEnv):
    secret_key: str
    secret_name: Optional[str] = None
    optional: Optional[bool] = None

    def build(self) -> _ModelEnvVar:
        """Constructs and returns the Argo environment specification"""
        return _ModelEnvVar(
            name=self.name,
            value_from=_ModelEnvVarSource(
                secret_key_ref=_ModelSecretKeySelector(
                    name=self.secret_name, key=self.secret_key, optional=self.optional
                )
            ),
        )

optional class-attribute

optional: Optional[bool] = None

secret_key class-attribute

secret_key: str

secret_name class-attribute

secret_name: Optional[str] = None

build

build()

Constructs and returns the Argo environment specification

Source code in hera/workflows/env.py
def build(self) -> _ModelEnvVar:
    """Constructs and returns the Argo environment specification"""
    return _ModelEnvVar(
        name=self.name,
        value_from=_ModelEnvVarSource(
            secret_key_ref=_ModelSecretKeySelector(
                name=self.secret_name, key=self.secret_key, optional=self.optional
            )
        ),
    )

SecretEnvFrom

Source code in hera/workflows/env_from.py
class SecretEnvFrom(_BaseEnvFrom, _ModelSecretEnvSource):
    def build(self) -> _ModelEnvFromSource:
        """Constructs and returns the Argo EnvFrom specification"""
        return _ModelEnvFromSource(
            prefix=self.prefix,
            secret_ref=_ModelSecretEnvSource(
                name=self.name,
                optional=self.optional,
            ),
        )

build

build()

Constructs and returns the Argo EnvFrom specification

Source code in hera/workflows/env_from.py
def build(self) -> _ModelEnvFromSource:
    """Constructs and returns the Argo EnvFrom specification"""
    return _ModelEnvFromSource(
        prefix=self.prefix,
        secret_ref=_ModelSecretEnvSource(
            name=self.name,
            optional=self.optional,
        ),
    )

SecretVolume

Source code in hera/workflows/volume.py
class SecretVolume(_BaseVolume, _ModelSecretVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            secret=_ModelSecretVolumeSource(
                default_mode=self.default_mode, items=self.items, optional=self.optional, secret_name=self.secret_name
            ),
        )

Step

Step is used to run a given template. Must be instantiated under a Steps or Parallel context, or outside of a Workflow.

Source code in hera/workflows/steps.py
class Step(
    TemplateInvocatorSubNodeMixin,
    ArgumentsMixin,
    SubNodeMixin,
    ParameterMixin,
    ItemMixin,
):
    """Step is used to run a given template. Must be instantiated under a Steps or Parallel context,
    or outside of a Workflow.
    """

    @property
    def id(self) -> str:
        return f"{{{{steps.{self.name}.id}}}}"

    @property
    def ip(self) -> str:
        return f"{{{{steps.{self.name}.ip}}}}"

    @property
    def status(self) -> str:
        return f"{{{{steps.{self.name}.status}}}}"

    @property
    def exit_code(self) -> str:
        return f"{{{{steps.{self.name}.exitCode}}}}"

    @property
    def started_at(self) -> str:
        return f"{{{{steps.{self.name}.startedAt}}}}"

    @property
    def finished_at(self) -> str:
        return f"{{{{steps.{self.name}.finishedAt}}}}"

    @property
    def result(self) -> str:
        return f"{{{{steps.{self.name}.outputs.result}}}}"

    def get_parameters_as(self, name):
        """Gets all the output parameters from this task"""
        return Parameter(name=name, value=f"{{{{steps.{self.name}.outputs.parameters}}}}")

    def get_parameter(self, name: str) -> Parameter:
        """Returns a Parameter from the task's outputs based on the name.

        Parameters
        ----------
        name: str
            The name of the parameter to extract as an output.

        Returns
        -------
        Parameter
            Parameter with the same name
        """
        if isinstance(self.template, str):
            raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

        # here, we build the template early to verify that we can get the outputs
        if isinstance(self.template, Templatable):
            template = self.template._build_template()
        else:
            template = self.template

        # at this point, we know that the template is a `Template` object
        if template.outputs is None:  # type: ignore
            raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
        if template.outputs.parameters is None:  # type: ignore
            raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
        parameters = template.outputs.parameters  # type: ignore

        obj = next((output for output in parameters if output.name == name), None)
        if obj is not None:
            obj.value = f"{{{{steps.{self.name}.outputs.parameters.{name}}}}}"
            return Parameter(
                name=obj.name,
                value=obj.value,
                value_from=obj.value_from,
                global_name=obj.global_name,
                description=obj.description,
            )
        raise KeyError(f"No output parameter named `{name}` found")

    def _build_as_workflow_step(self) -> _ModelWorkflowStep:
        _template = None
        if isinstance(self.template, str):
            _template = self.template
        elif isinstance(self.template, (_ModelTemplate, TemplateMixin)):
            _template = self.template.name

        _inline = None
        if isinstance(self.inline, _ModelTemplate):
            _inline = self.inline
        elif isinstance(self.inline, Templatable):
            _inline = self.inline._build_template()

        return _ModelWorkflowStep(
            arguments=self._build_arguments(),
            continue_on=self.continue_on,
            hooks=self.hooks,
            inline=_inline,
            name=self.name,
            on_exit=self.on_exit,
            template=_template,
            template_ref=self.template_ref,
            when=self.when,
            with_items=self._build_with_items(),
            with_param=self._build_with_param(),
            with_sequence=self.with_sequence,
        )

    def _build_step(
        self,
    ) -> List[_ModelWorkflowStep]:
        return [self._build_as_workflow_step()]

exit_code property

exit_code: str

finished_at property

finished_at: str

id property

id: str

ip property

ip: str

result property

result: str

started_at property

started_at: str

status property

status: str

get_parameter

get_parameter(name)

Returns a Parameter from the task’s outputs based on the name.

Parameters
str

The name of the parameter to extract as an output.

Returns

Parameter Parameter with the same name

Source code in hera/workflows/steps.py
def get_parameter(self, name: str) -> Parameter:
    """Returns a Parameter from the task's outputs based on the name.

    Parameters
    ----------
    name: str
        The name of the parameter to extract as an output.

    Returns
    -------
    Parameter
        Parameter with the same name
    """
    if isinstance(self.template, str):
        raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

    # here, we build the template early to verify that we can get the outputs
    if isinstance(self.template, Templatable):
        template = self.template._build_template()
    else:
        template = self.template

    # at this point, we know that the template is a `Template` object
    if template.outputs is None:  # type: ignore
        raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
    if template.outputs.parameters is None:  # type: ignore
        raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
    parameters = template.outputs.parameters  # type: ignore

    obj = next((output for output in parameters if output.name == name), None)
    if obj is not None:
        obj.value = f"{{{{steps.{self.name}.outputs.parameters.{name}}}}}"
        return Parameter(
            name=obj.name,
            value=obj.value,
            value_from=obj.value_from,
            global_name=obj.global_name,
            description=obj.description,
        )
    raise KeyError(f"No output parameter named `{name}` found")

get_parameters_as

get_parameters_as(name)

Gets all the output parameters from this task

Source code in hera/workflows/steps.py
def get_parameters_as(self, name):
    """Gets all the output parameters from this task"""
    return Parameter(name=name, value=f"{{{{steps.{self.name}.outputs.parameters}}}}")

Steps

A Steps template invocator is used to define a sequence of steps which can run sequentially or in parallel.

Steps implements the contextmanager interface so allows usage of with, under which any hera.workflows.steps.Step objects instantiated will be added to the Steps’ list of sub_steps.

  • Step and Parallel objects initialised within a Steps context will be added to the list of sub_steps in the order they are initialised.
  • All Step objects initialised within a Parallel context will run in parallel.
Source code in hera/workflows/steps.py
class Steps(
    ContextMixin,
    IOMixin,
    TemplateMixin,
):
    """A Steps template invocator is used to define a sequence of steps which can run
    sequentially or in parallel.

    Steps implements the contextmanager interface so allows usage of `with`, under which any
    `hera.workflows.steps.Step` objects instantiated will be added to the Steps' list of sub_steps.

    * Step and Parallel objects initialised within a Steps context will be added to the list of sub_steps
    in the order they are initialised.
    * All Step objects initialised within a Parallel context will run in parallel.
    """

    sub_steps: List[
        Union[
            Step,
            Parallel,
            List[Step],
            _ModelWorkflowStep,
            List[_ModelWorkflowStep],
        ]
    ] = []

    def _build_steps(self) -> Optional[List[List[_ModelWorkflowStep]]]:
        steps = []
        for workflow_step in self.sub_steps:
            if isinstance(workflow_step, Steppable):
                steps.append(workflow_step._build_step())
            elif isinstance(workflow_step, _ModelWorkflowStep):
                steps.append([workflow_step])
            elif isinstance(workflow_step, List):
                substeps = []
                for s in workflow_step:
                    if isinstance(s, Step):
                        substeps.append(s._build_as_workflow_step())
                    elif isinstance(s, _ModelWorkflowStep):
                        substeps.append(s)
                    else:
                        raise InvalidType(type(s))
                steps.append(substeps)
            else:
                raise InvalidType(type(workflow_step))

        return steps or None

    def _add_sub(self, node: Any):
        if not isinstance(node, (Step, Parallel)):
            raise InvalidType(type(node))

        self.sub_steps.append(node)

    def parallel(self) -> Parallel:
        """Returns a Parallel object which can be used in a sub-context manager."""
        return Parallel()

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            container=None,
            container_set=None,
            daemon=self.daemon,
            dag=None,
            data=None,
            executor=self.executor,
            fail_fast=self.fail_fast,
            http=None,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self.metrics,
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            parallelism=self.parallelism,
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            priority_class_name=self.priority_class_name,
            resource=None,
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            script=None,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            steps=self._build_steps(),
            suspend=None,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

sub_steps class-attribute

sub_steps: List[Union[Step, Parallel, List[Step], _ModelWorkflowStep, List[_ModelWorkflowStep]]] = []

parallel

parallel()

Returns a Parallel object which can be used in a sub-context manager.

Source code in hera/workflows/steps.py
def parallel(self) -> Parallel:
    """Returns a Parallel object which can be used in a sub-context manager."""
    return Parallel()

StorageOSVolume

Source code in hera/workflows/volume.py
class StorageOSVolume(_BaseVolume, _ModelStorageOSVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            storageos=_ModelStorageOSVolumeSource(
                fs_type=self.fs_type,
                read_only=self.read_only,
                secret_ref=self.secret_ref,
                volume_name=self.volume_name,
                volume_namespace=self.volume_namespace,
            ),
        )

Suspend

The Suspend template allows the user to pause a workflow for a specified length of time given by duration or indefinitely (i.e. until manually resumed). The Suspend template also allows you to specify intermediate_parameters which will replicate the given parameters to the “inputs” and “outputs” of the template, resulting in a Suspend template that pauses and waits for values from the user for the given list of parameters.

Source code in hera/workflows/suspend.py
class Suspend(TemplateMixin):
    """The Suspend template allows the user to pause a workflow for a specified length of time
    given by `duration` or indefinitely (i.e. until manually resumed). The Suspend template also
    allows you to specify `intermediate_parameters` which will replicate the given parameters to
    the "inputs" and "outputs" of the template, resulting in a Suspend template that pauses and
    waits for values from the user for the given list of parameters.
    """

    duration: Optional[Union[int, str]] = None
    intermediate_parameters: List[Parameter] = []

    def _build_suspend_template(self) -> _ModelSuspendTemplate:
        return _ModelSuspendTemplate(
            duration=self.duration,
        )

    def _build_outputs(self) -> Optional[Outputs]:
        outputs = []
        for param in self.intermediate_parameters:
            outputs.append(
                Parameter(name=param.name, value_from={"supplied": {}}, description=param.description).as_output()
            )
        return Outputs(parameters=outputs) if outputs else None

    def _build_inputs(self) -> Optional[Inputs]:
        inputs = []
        for param in self.intermediate_parameters:
            inputs.append(param.as_input())
        return Inputs(parameters=inputs) if inputs else None

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self.init_containers,
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            plugin=self.plugin,
            priority_class_name=self.priority_class_name,
            priority=self.priority,
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self.sidecars,
            suspend=self._build_suspend_template(),
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

duration class-attribute

duration: Optional[Union[int, str]] = None

intermediate_parameters class-attribute

intermediate_parameters: List[Parameter] = []

TarArchiveStrategy

Source code in hera/workflows/archive.py
class TarArchiveStrategy(ArchiveStrategy):
    compression_level: Optional[int] = None

    def _build_archive_strategy(self) -> _ModelArchiveStrategy:
        return _ModelArchiveStrategy(tar=_ModelTarStrategy(compression_level=self.compression_level))

compression_level class-attribute

compression_level: Optional[int] = None

Task

Task is used to run a given template within a DAG. Must be instantiated under a DAG context.

Dependencies

Any Tasks without a dependency defined will start immediately.

Dependencies between Tasks can be described using the convenience syntax >>, for example: A = Task(…) B = Task(…) A >> B describes the relationships: * “A has no dependencies (so starts immediately) * “B depends on A”. As a diagram: A | B

A >> B is equivalent to A.next(B).

Lists of Tasks

A list of Tasks used with the rshift syntax describes an “AND” dependency between the single Task on the left of >> and the list Tasks to the right of >> (or vice versa). A list of Tasks on both sides of >> is not supported.

For example

A = Task(…) B = Task(…) C = Task(…) D = Task(…) A >> [B, C] >> D

describes the relationships: * “A has no dependencies * “B AND C depend on A” * “D depends on B AND C”

As a diagram

A

/ \ B C \ / D

Dependencies can be described over multiple statements

A = Task(…) B = Task(…) C = Task(…) D = Task(…) A >> [C, D] B >> [C, D]

describes the relationships: * “A and B have no dependencies * “C depends on A AND B” * “D depends on A AND B” As a diagram: A B | X | C D

Source code in hera/workflows/task.py
class Task(
    TemplateInvocatorSubNodeMixin,
    ArgumentsMixin,
    SubNodeMixin,
    ParameterMixin,
    ItemMixin,
):
    r"""Task is used to run a given template within a DAG. Must be instantiated under a DAG context.

    ## Dependencies
    Any Tasks without a dependency defined will start immediately.

    Dependencies between Tasks can be described using the convenience syntax `>>`, for example:
        A = Task(...)
        B = Task(...)
        A >> B
    describes the relationships:
    * "A has no dependencies (so starts immediately)
    * "B depends on A".
    As a diagram:
    A
    |
    B

    `A >> B` is equivalent to `A.next(B)`.

    ## Lists of Tasks
    A list of Tasks used with the rshift syntax describes an "AND" dependency between the single Task on the left of
    `>>` and the list Tasks to the right of `>>` (or vice versa). A list of Tasks on both sides of `>>` is not supported.
    For example:
        A = Task(...)
        B = Task(...)
        C = Task(...)
        D = Task(...)
        A >> [B, C] >> D
    describes the relationships:
    * "A has no dependencies
    * "B AND C depend on A"
    * "D depends on B AND C"
    As a diagram:
      A
     / \
    B   C
     \ /
      D

    Dependencies can be described over multiple statements:
        A = Task(...)
        B = Task(...)
        C = Task(...)
        D = Task(...)
        A >> [C, D]
        B >> [C, D]
    describes the relationships:
    * "A and B have no dependencies
    * "C depends on A AND B"
    * "D depends on A AND B"
    As a diagram:
    A   B
    | X |
    C   D
    """
    dependencies: Optional[List[str]] = None
    depends: Optional[str] = None

    def _get_dependency_tasks(self) -> List[str]:
        if self.depends is None:
            return []

        # filter out operators
        all_operators = [o for o in Operator]
        tasks = [t for t in self.depends.split() if t not in all_operators]

        # remove dot suffixes
        task_names = [t.split(".")[0] for t in tasks]
        return task_names

    @property
    def id(self) -> str:
        return f"{{{{tasks.{self.name}.id}}}}"

    @property
    def ip(self) -> str:
        return f"{{{{tasks.{self.name}.ip}}}}"

    @property
    def status(self) -> str:
        return f"{{{{tasks.{self.name}.status}}}}"

    @property
    def exit_code(self) -> str:
        return f"{{{{tasks.{self.name}.exitCode}}}}"

    @property
    def started_at(self) -> str:
        return f"{{{{tasks.{self.name}.startedAt}}}}"

    @property
    def finished_at(self) -> str:
        return f"{{{{tasks.{self.name}.finishedAt}}}}"

    @property
    def result(self) -> str:
        return f"{{{{tasks.{self.name}.outputs.result}}}}"

    def get_parameters_as(self, name):
        """Gets all the output parameters from this task"""
        return Parameter(name=name, value=f"{{{{tasks.{self.name}.outputs.parameters}}}}")

    def get_parameter(self, name: str) -> Parameter:
        """Returns a Parameter from the task's outputs based on the name.

        Parameters
        ----------
        name: str
            The name of the parameter to extract as an output.

        Returns
        -------
        Parameter
            Parameter with the same name
        """
        if isinstance(self.template, str):
            raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

        # here, we build the template early to verify that we can get the outputs
        if isinstance(self.template, Templatable):
            template = self.template._build_template()
        else:
            template = self.template

        # at this point, we know that the template is a `Template` object
        if template.outputs is None:  # type: ignore
            raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
        if template.outputs.parameters is None:  # type: ignore
            raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
        parameters = template.outputs.parameters  # type: ignore

        obj = next((output for output in parameters if output.name == name), None)
        if obj is not None:
            obj.value = f"{{{{tasks.{self.name}.outputs.parameters.{name}}}}}"
            return Parameter(
                name=obj.name,
                value=obj.value,
                value_from=obj.value_from,
                global_name=obj.global_name,
                description=obj.description,
            )
        raise KeyError(f"No output parameter named `{name}` found")

    def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task:
        """Set self as a dependency of `other`."""
        assert issubclass(other.__class__, Task)

        condition = f".{on}" if on else ""

        if other.depends is None:
            # First dependency
            other.depends = self.name + condition
        elif self.name in other._get_dependency_tasks():
            raise ValueError(f"{self.name} already in {other.name}'s depends: {other.depends}")
        else:
            # Add follow-up dependency
            other.depends += f" {operator} {self.name + condition}"
        return other

    def __rrshift__(self, other: List[Union[Task, str]]) -> Task:
        """Set `other` as a dependency self."""
        assert isinstance(other, list), f"Unknown type {type(other)} specified using reverse right bitshift operator"
        for o in other:
            if isinstance(o, Task):
                o.next(self)
            else:
                assert isinstance(
                    o, str
                ), f"Unknown list item type {type(o)} specified using reverse right bitshift operator"
                if self.depends is None:
                    self.depends = o
                else:
                    self.depends += f" && {o}"
        return self

    def __rshift__(self, other: Union[Task, List[Task]]) -> Union[Task, List[Task]]:
        """Set self as a dependency of `other` which can be a single Task or list of Tasks."""
        if isinstance(other, Task):
            return self.next(other)
        elif isinstance(other, list):
            for o in other:
                assert isinstance(
                    o, Task
                ), f"Unknown list item type {type(o)} specified using right bitshift operator `>>`"
                self.next(o)
            return other
        raise ValueError(f"Unknown type {type(other)} provided to `__rshift__`")

    def __or__(self, other: Union[Task, str]) -> str:
        """Adds a condition of"""
        if isinstance(other, Task):
            return f"({self.name} || {other.name})"
        assert isinstance(other, str), f"Unknown type {type(other)} specified using `|` operator"
        return f"{self.name} || {other}"

    def on_workflow_status(self, status: WorkflowStatus, op: Operator = Operator.equals) -> Task:
        expression = f"{{{{workflow.status}}}} {op} {status}"
        if self.when:
            self.when += f" {Operator.and_} {expression}"
        else:
            self.when = expression
        return self

    def on_success(self, other: Task) -> Task:
        return self.next(other, on=TaskResult.succeeded)

    def on_failure(self, other: Task) -> Task:
        return self.next(other, on=TaskResult.failed)

    def on_error(self, other: Task) -> Task:
        return self.next(other, on=TaskResult.errored)

    def on_other_result(self, other: Task, value: str, operator: Operator = Operator.equals) -> Task:
        expression = f"{other.result} {operator} {value}"
        if self.when:
            self.when += f" {Operator.and_} {expression}"
        else:
            self.when = expression
        other.next(self)
        return self

    def when_any_succeeded(self, other: Task) -> Task:
        assert (self.with_param is not None) or (
            self.with_sequence is not None
        ), "Can only use `when_all_failed` when using `with_param` or `with_sequence`"

        return self.next(other, on=TaskResult.any_succeeded)

    def when_all_failed(self, other: Task) -> Task:
        assert (self.with_param is not None) or (
            self.with_sequence is not None
        ), "Can only use `when_all_failed` when using `with_param` or `with_sequence`"

        return self.next(other, on=TaskResult.all_failed)

    def _build_dag_task(self) -> _ModelDAGTask:
        _template = None
        if isinstance(self.template, str):
            _template = self.template
        elif isinstance(self.template, (Template, TemplateMixin)):
            _template = self.template.name

        _inline = None
        if isinstance(self.inline, Template):
            _inline = self.inline
        elif isinstance(self.inline, Templatable):
            _inline = self.inline._build_template()

        return _ModelDAGTask(
            arguments=self._build_arguments(),
            continue_on=self.continue_on,
            dependencies=self.dependencies,
            depends=self.depends,
            hooks=self.hooks,
            inline=_inline,
            name=self.name,
            on_exit=self.on_exit,
            template=_template,
            template_ref=self.template_ref,
            when=self.when,
            with_items=self._build_with_items(),
            with_param=self._build_with_param(),
            with_sequence=self.with_sequence,
        )

dependencies class-attribute

dependencies: Optional[List[str]] = None

depends class-attribute

depends: Optional[str] = None

exit_code property

exit_code: str

finished_at property

finished_at: str

id property

id: str

ip property

ip: str

result property

result: str

started_at property

started_at: str

status property

status: str

get_parameter

get_parameter(name)

Returns a Parameter from the task’s outputs based on the name.

Parameters
str

The name of the parameter to extract as an output.

Returns

Parameter Parameter with the same name

Source code in hera/workflows/task.py
def get_parameter(self, name: str) -> Parameter:
    """Returns a Parameter from the task's outputs based on the name.

    Parameters
    ----------
    name: str
        The name of the parameter to extract as an output.

    Returns
    -------
    Parameter
        Parameter with the same name
    """
    if isinstance(self.template, str):
        raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

    # here, we build the template early to verify that we can get the outputs
    if isinstance(self.template, Templatable):
        template = self.template._build_template()
    else:
        template = self.template

    # at this point, we know that the template is a `Template` object
    if template.outputs is None:  # type: ignore
        raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
    if template.outputs.parameters is None:  # type: ignore
        raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
    parameters = template.outputs.parameters  # type: ignore

    obj = next((output for output in parameters if output.name == name), None)
    if obj is not None:
        obj.value = f"{{{{tasks.{self.name}.outputs.parameters.{name}}}}}"
        return Parameter(
            name=obj.name,
            value=obj.value,
            value_from=obj.value_from,
            global_name=obj.global_name,
            description=obj.description,
        )
    raise KeyError(f"No output parameter named `{name}` found")

get_parameters_as

get_parameters_as(name)

Gets all the output parameters from this task

Source code in hera/workflows/task.py
def get_parameters_as(self, name):
    """Gets all the output parameters from this task"""
    return Parameter(name=name, value=f"{{{{tasks.{self.name}.outputs.parameters}}}}")

next

next(other, operator=Operator.and_, on=None)

Set self as a dependency of other.

Source code in hera/workflows/task.py
def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task:
    """Set self as a dependency of `other`."""
    assert issubclass(other.__class__, Task)

    condition = f".{on}" if on else ""

    if other.depends is None:
        # First dependency
        other.depends = self.name + condition
    elif self.name in other._get_dependency_tasks():
        raise ValueError(f"{self.name} already in {other.name}'s depends: {other.depends}")
    else:
        # Add follow-up dependency
        other.depends += f" {operator} {self.name + condition}"
    return other

on_error

on_error(other)
Source code in hera/workflows/task.py
def on_error(self, other: Task) -> Task:
    return self.next(other, on=TaskResult.errored)

on_failure

on_failure(other)
Source code in hera/workflows/task.py
def on_failure(self, other: Task) -> Task:
    return self.next(other, on=TaskResult.failed)

on_other_result

on_other_result(other, value, operator=Operator.equals)
Source code in hera/workflows/task.py
def on_other_result(self, other: Task, value: str, operator: Operator = Operator.equals) -> Task:
    expression = f"{other.result} {operator} {value}"
    if self.when:
        self.when += f" {Operator.and_} {expression}"
    else:
        self.when = expression
    other.next(self)
    return self

on_success

on_success(other)
Source code in hera/workflows/task.py
def on_success(self, other: Task) -> Task:
    return self.next(other, on=TaskResult.succeeded)

on_workflow_status

on_workflow_status(status, op=Operator.equals)
Source code in hera/workflows/task.py
def on_workflow_status(self, status: WorkflowStatus, op: Operator = Operator.equals) -> Task:
    expression = f"{{{{workflow.status}}}} {op} {status}"
    if self.when:
        self.when += f" {Operator.and_} {expression}"
    else:
        self.when = expression
    return self

when_all_failed

when_all_failed(other)
Source code in hera/workflows/task.py
def when_all_failed(self, other: Task) -> Task:
    assert (self.with_param is not None) or (
        self.with_sequence is not None
    ), "Can only use `when_all_failed` when using `with_param` or `with_sequence`"

    return self.next(other, on=TaskResult.all_failed)

when_any_succeeded

when_any_succeeded(other)
Source code in hera/workflows/task.py
def when_any_succeeded(self, other: Task) -> Task:
    assert (self.with_param is not None) or (
        self.with_sequence is not None
    ), "Can only use `when_all_failed` when using `with_param` or `with_sequence`"

    return self.next(other, on=TaskResult.any_succeeded)

TaskResult

The enumeration of Task Results specified at https://argoproj.github.io/argo-workflows/enhanced-depends-logic/#depends

Source code in hera/workflows/task.py
class TaskResult(Enum):
    """The enumeration of Task Results specified at
    https://argoproj.github.io/argo-workflows/enhanced-depends-logic/#depends
    """

    failed = "Failed"
    succeeded = "Succeeded"
    errored = "Errored"
    skipped = "Skipped"
    omitted = "Omitted"
    daemoned = "Daemoned"
    any_succeeded = "AnySucceeded"
    all_failed = "AllFailed"

all_failed class-attribute

all_failed = 'AllFailed'

any_succeeded class-attribute

any_succeeded = 'AnySucceeded'

daemoned class-attribute

daemoned = 'Daemoned'

errored class-attribute

errored = 'Errored'

failed class-attribute

failed = 'Failed'

omitted class-attribute

omitted = 'Omitted'

skipped class-attribute

skipped = 'Skipped'

succeeded class-attribute

succeeded = 'Succeeded'

UserContainer

Source code in hera/workflows/user_container.py
class UserContainer(_ModelUserContainer):
    env: Optional[List[Union[_BaseEnv, EnvVar]]] = None  # type: ignore[assignment]
    env_from: Optional[List[Union[_BaseEnvFrom, EnvFromSource]]] = None  # type: ignore[assignment]
    image_pull_policy: Optional[Union[str, ImagePullPolicy]] = None  # type: ignore[assignment]
    resources: Optional[Union[Resources, ResourceRequirements]] = None  # type: ignore[assignment]
    volumes: Optional[List[_BaseVolume]] = None

    def build(self) -> _ModelUserContainer:
        return _ModelUserContainer(
            args=self.args,
            command=self.command,
            env=self.env,
            env_from=self.env_from,
            image=self.image,
            image_pull_policy=self.image_pull_policy,
            lifecycle=self.lifecycle,
            liveness_probe=self.liveness_probe,
            mirror_volume_mounts=self.mirror_volume_mounts,
            name=self.name,
            ports=self.ports,
            readiness_probe=self.readiness_probe,
            resources=self.resources,
            security_context=self.security_context,
            startup_probe=self.startup_probe,
            stdin=self.stdin,
            stdin_once=self.stdin_once,
            termination_message_path=self.termination_message_path,
            termination_message_policy=self.termination_message_policy,
            tty=self.tty,
            volume_devices=self.volume_devices,
            volume_mounts=None if self.volumes is None else [v._build_volume_mount() for v in self.volumes],
            working_dir=self.working_dir,
        )

env class-attribute

env: Optional[List[Union[_BaseEnv, EnvVar]]] = None

env_from class-attribute

env_from: Optional[List[Union[_BaseEnvFrom, EnvFromSource]]] = None

image_pull_policy class-attribute

image_pull_policy: Optional[Union[str, ImagePullPolicy]] = None

resources class-attribute

resources: Optional[Union[Resources, ResourceRequirements]] = None

volumes class-attribute

volumes: Optional[List[_BaseVolume]] = None

build

build()
Source code in hera/workflows/user_container.py
def build(self) -> _ModelUserContainer:
    return _ModelUserContainer(
        args=self.args,
        command=self.command,
        env=self.env,
        env_from=self.env_from,
        image=self.image,
        image_pull_policy=self.image_pull_policy,
        lifecycle=self.lifecycle,
        liveness_probe=self.liveness_probe,
        mirror_volume_mounts=self.mirror_volume_mounts,
        name=self.name,
        ports=self.ports,
        readiness_probe=self.readiness_probe,
        resources=self.resources,
        security_context=self.security_context,
        startup_probe=self.startup_probe,
        stdin=self.stdin,
        stdin_once=self.stdin_once,
        termination_message_path=self.termination_message_path,
        termination_message_policy=self.termination_message_policy,
        tty=self.tty,
        volume_devices=self.volume_devices,
        volume_mounts=None if self.volumes is None else [v._build_volume_mount() for v in self.volumes],
        working_dir=self.working_dir,
    )

Volume

Source code in hera/workflows/volume.py
class Volume(_BaseVolume, _ModelPersistentVolumeClaimSpec):
    size: Optional[str] = None  # type: ignore
    resources: Optional[ResourceRequirements] = None
    metadata: Optional[ObjectMeta] = None
    access_modes: Optional[List[Union[str, AccessMode]]] = [AccessMode.read_write_once]  # type: ignore
    storage_class_name: Optional[str] = None

    @validator("access_modes", pre=True, always=True)
    def _check_access_modes(cls, v):
        if not v:
            return [AccessMode.read_write_once]

        result = []
        for mode in v:
            if isinstance(mode, AccessMode):
                result.append(mode)
            else:
                result.append(AccessMode(mode))
        return result

    @validator("name", pre=True, always=True)
    def _check_name(cls, v):
        return v or str(uuid.uuid4())

    @root_validator(pre=True)
    def _merge_reqs(cls, values):
        if "size" in values and "resources" in values:
            resources: ResourceRequirements = values.get("resources")
            if resources.requests is not None:
                if "storage" in resources.requests:
                    pass  # take the storage specification in resources
                else:
                    resources.requests["storage"] = values.get("size")
            values["resources"] = resources
        elif "resources" not in values:
            assert "size" in values, "at least one of `size` or `resources` must be specified"
            validate_storage_units(cast(str, values.get("size")))
            values["resources"] = ResourceRequirements(requests={"storage": values.get("size")})
        elif "resources" in values:
            resources = cast(ResourceRequirements, values.get("resources"))
            assert resources.requests is not None, "Resource requests are required"
            storage = resources.requests.get("storage")
            assert storage is not None, "At least one of `size` or `resources.requests.storage` must be specified"
            validate_storage_units(cast(str, storage))
        return values

    def _build_persistent_volume_claim(self) -> _ModelPersistentVolumeClaim:
        return _ModelPersistentVolumeClaim(
            metadata=self.metadata or ObjectMeta(name=self.name),
            spec=_ModelPersistentVolumeClaimSpec(
                access_modes=[str(cast(AccessMode, am).value) for am in self.access_modes]
                if self.access_modes is not None
                else None,
                data_source=self.data_source,
                data_source_ref=self.data_source_ref,
                resources=self.resources,
                selector=self.selector,
                storage_class_name=self.storage_class_name,
                volume_mode=self.volume_mode,
                volume_name=self.volume_name,
            ),
        )

    def _build_volume(self) -> _ModelVolume:
        claim = self._build_persistent_volume_claim()
        assert claim.metadata is not None, "claim metadata is required"
        return _ModelVolume(
            name=self.name,
            persistent_volume_claim=_ModelPersistentVolumeClaimVolumeSource(
                claim_name=cast(str, claim.metadata.name),
                read_only=self.read_only,
            ),
        )

access_modes class-attribute

access_modes: Optional[List[Union[str, AccessMode]]] = [AccessMode.read_write_once]

metadata class-attribute

metadata: Optional[ObjectMeta] = None

resources class-attribute

resources: Optional[ResourceRequirements] = None

size class-attribute

size: Optional[str] = None

storage_class_name class-attribute

storage_class_name: Optional[str] = None

VsphereVirtualDiskVolume

Source code in hera/workflows/volume.py
class VsphereVirtualDiskVolume(_BaseVolume, _ModelVsphereVirtualDiskVolumeSource):
    def _build_volume(self) -> _ModelVolume:
        return _ModelVolume(
            name=self.name,
            vsphere_volume=_ModelVsphereVirtualDiskVolumeSource(
                fs_type=self.fs_type,
                storage_policy_id=self.storage_policy_id,
                storage_policy_name=self.storage_policy_name,
                volume_path=self.volume_path,
            ),
        )

Workflow

The base Workflow class for Hera.

Workflow implements the contextmanager interface so allows usage of with, under which any hera.workflows.protocol.Templatable object or hera.workflows.models.Template object instantiated under the context will be added to the Workflow’s list of templates.

Workflows can be created directly on your Argo cluster via create. They can also be dumped to yaml via to_yaml or built according to the Argo schema via build to get an OpenAPI model object.

Source code in hera/workflows/workflow.py
class Workflow(
    ArgumentsMixin,
    ContextMixin,
    HookMixin,
    VolumeMixin,
):
    """The base Workflow class for Hera.

    Workflow implements the contextmanager interface so allows usage of `with`, under which
    any `hera.workflows.protocol.Templatable` object or `hera.workflows.models.Template` object
    instantiated under the context will be added to the Workflow's list of templates.

    Workflows can be created directly on your Argo cluster via `create`. They can also be dumped
    to yaml via `to_yaml` or built according to the Argo schema via `build` to get an OpenAPI model
    object.
    """

    # Workflow fields - https://argoproj.github.io/argo-workflows/fields/#workflow
    api_version: Optional[str] = None
    kind: Optional[str] = None
    status: Optional[WorkflowStatus] = None

    # ObjectMeta fields - https://argoproj.github.io/argo-workflows/fields/#objectmeta
    annotations: Optional[Dict[str, str]] = None
    cluster_name: Optional[str] = None
    creation_timestamp: Optional[Time] = None
    deletion_grace_period_seconds: Optional[int] = None
    deletion_timestamp: Optional[Time] = None
    finalizers: Optional[List[str]] = None
    generate_name: Optional[str] = None
    generation: Optional[int] = None
    labels: Optional[Dict[str, str]] = None
    managed_fields: Optional[List[ManagedFieldsEntry]] = None
    name: Optional[str] = None
    namespace: Optional[str] = None
    owner_references: Optional[List[OwnerReference]] = None
    resource_version: Optional[str] = None
    self_link: Optional[str] = None
    uid: Optional[str] = None

    # WorkflowSpec fields - https://argoproj.github.io/argo-workflows/fields/#workflowspec
    active_deadline_seconds: Optional[int] = None
    affinity: Optional[Affinity] = None
    archive_logs: Optional[bool] = None
    artifact_gc: Optional[ArtifactGC] = None
    artifact_repository_ref: Optional[ArtifactRepositoryRef] = None
    automount_service_account_token: Optional[bool] = None
    dns_config: Optional[PodDNSConfig] = None
    dns_policy: Optional[str] = None
    entrypoint: Optional[str] = None
    executor: Optional[ExecutorConfig] = None
    hooks: Optional[Dict[str, LifecycleHook]] = None
    host_aliases: Optional[List[HostAlias]] = None
    host_network: Optional[bool] = None
    image_pull_secrets: ImagePullSecrets = None
    metrics: Optional[Metrics] = None
    node_selector: Optional[Dict[str, str]] = None
    on_exit: Optional[str] = None
    parallelism: Optional[int] = None
    pod_disruption_budget: Optional[PodDisruptionBudgetSpec] = None
    pod_gc: Optional[PodGC] = None
    pod_metadata: Optional[Metadata] = None
    pod_priority: Optional[int] = None
    pod_priority_class_name: Optional[str] = None
    pod_spec_patch: Optional[str] = None
    priority: Optional[int] = None
    retry_strategy: Optional[RetryStrategy] = None
    scheduler_name: Optional[str] = None
    security_context: Optional[PodSecurityContext] = None
    service_account_name: Optional[str] = None
    shutdown: Optional[str] = None
    suspend: Optional[bool] = None
    synchronization: Optional[Synchronization] = None
    template_defaults: Optional[Template] = None
    templates: List[Union[Template, Templatable]] = []
    tolerations: Optional[List[Toleration]] = None
    ttl_strategy: Optional[TTLStrategy] = None
    volume_claim_gc: Optional[VolumeClaimGC] = None
    volume_claim_templates: Optional[List[PersistentVolumeClaim]] = None
    workflow_metadata: Optional[WorkflowMetadata] = None
    workflow_template_ref: Optional[WorkflowTemplateRef] = None

    # Hera-specific fields
    workflows_service: Optional[WorkflowsService] = None

    @validator("api_version", pre=True, always=True)
    def _set_api_version(cls, v):
        if v is None:
            return global_config.api_version
        return v

    @validator("workflows_service", pre=True, always=True)
    def _set_workflows_service(cls, v):
        if v is None:
            return WorkflowsService()
        return v

    @validator("kind", pre=True, always=True)
    def _set_kind(cls, v):
        if v is None:
            return cls.__name__  # type: ignore
        return v

    @validator("namespace", pre=True, always=True)
    def _set_namespace(cls, v):
        if v is None:
            return global_config.namespace
        return v

    @validator("service_account_name", pre=True, always=True)
    def _set_service_account_name(cls, v):
        if v is None:
            return global_config.service_account_name
        return v

    @validator("image_pull_secrets", pre=True, always=True)
    def _set_image_pull_secrets(cls, v):
        if v is None:
            return None

        if isinstance(v, str):
            return [LocalObjectReference(name=v)]
        elif isinstance(v, LocalObjectReference):
            return [v]

        assert isinstance(v, list), (
            "`image_pull_secrets` expected to be either a `str`, a `LocalObjectReferences`, a list of `str`, "
            "or a list of `LocalObjectReferences`"
        )
        result = []
        for secret in v:
            if isinstance(secret, str):
                result.append(LocalObjectReference(name=secret))
            elif isinstance(secret, LocalObjectReference):
                result.append(secret)
        return result

    def build(self) -> TWorkflow:
        """Builds the Workflow and its components into an Argo schema Workflow object."""
        self = self._dispatch_hooks()

        templates = []
        for template in self.templates:
            if isinstance(template, HookMixin):
                template = template._dispatch_hooks()

            if isinstance(template, Templatable):
                templates.append(template._build_template())
            elif isinstance(template, get_args(TTemplate)):
                templates.append(template)
            else:
                raise InvalidType(f"{type(template)} is not a valid template type")

            if isinstance(template, VolumeClaimable):
                claims = template._build_persistent_volume_claims()
                # If there are no claims, continue, nothing to add
                if not claims:
                    continue
                # If there are no volume claim templates, set them to the constructed claims
                elif self.volume_claim_templates is None:
                    self.volume_claim_templates = claims
                else:
                    # otherwise, we need to merge the two lists of volume claim templates. This prioritizes the
                    # already existing volume claim templates under the assumption that the user has already set
                    # a claim template on the workflow intentionally, or the user is sharing the same volumes across
                    # different templates
                    current_volume_claims_map = {}
                    for claim in self.volume_claim_templates:
                        assert claim.metadata is not None, "expected a workflow volume claim with metadata"
                        assert claim.metadata.name is not None, "expected a named workflow volume claim"
                        current_volume_claims_map[claim.metadata.name] = claim

                    new_volume_claims_map = {}
                    for claim in claims:
                        assert claim.metadata is not None, "expected a volume claim with metadata"
                        assert claim.metadata.name is not None, "expected a named volume claim"
                        new_volume_claims_map[claim.metadata.name] = claim

                    for claim_name, claim in new_volume_claims_map.items():
                        if claim_name not in current_volume_claims_map:
                            self.volume_claim_templates.append(claim)

        workflow_claims = self._build_persistent_volume_claims()
        volume_claim_templates = (self.volume_claim_templates or []) + (workflow_claims or [])
        return _ModelWorkflow(
            api_version=self.api_version,
            kind=self.kind,
            metadata=ObjectMeta(
                annotations=self.annotations,
                cluster_name=self.cluster_name,
                creation_timestamp=self.creation_timestamp,
                deletion_grace_period_seconds=self.deletion_grace_period_seconds,
                deletion_timestamp=self.deletion_timestamp,
                finalizers=self.finalizers,
                generate_name=self.generate_name,
                generation=self.generation,
                labels=self.labels,
                managed_fields=self.managed_fields,
                name=self.name,
                namespace=self.namespace,
                owner_references=self.owner_references,
                resource_version=self.resource_version,
                self_link=self.self_link,
                uid=self.uid,
            ),
            spec=_ModelWorkflowSpec(
                active_deadline_seconds=self.active_deadline_seconds,
                affinity=self.affinity,
                archive_logs=self.archive_logs,
                arguments=self._build_arguments(),
                artifact_gc=self.artifact_gc,
                artifact_repository_ref=self.artifact_repository_ref,
                automount_service_account_token=self.automount_service_account_token,
                dns_config=self.dns_config,
                dns_policy=self.dns_policy,
                entrypoint=self.entrypoint,
                executor=self.executor,
                hooks=self.hooks,
                host_aliases=self.host_aliases,
                host_network=self.host_network,
                image_pull_secrets=self.image_pull_secrets,
                metrics=self.metrics,
                node_selector=self.node_selector,
                on_exit=self.on_exit,
                parallelism=self.parallelism,
                pod_disruption_budget=self.pod_disruption_budget,
                pod_gc=self.pod_gc,
                pod_metadata=self.pod_metadata,
                pod_priority=self.pod_priority,
                pod_priority_class_name=self.pod_priority_class_name,
                pod_spec_patch=self.pod_spec_patch,
                priority=self.priority,
                retry_strategy=self.retry_strategy,
                scheduler_name=self.scheduler_name,
                security_context=self.security_context,
                service_account_name=self.service_account_name,
                shutdown=self.shutdown,
                suspend=self.suspend,
                synchronization=self.synchronization,
                template_defaults=self.template_defaults,
                templates=templates or None,
                tolerations=self.tolerations,
                ttl_strategy=self.ttl_strategy,
                volume_claim_gc=self.volume_claim_gc,
                volume_claim_templates=volume_claim_templates or None,
                volumes=self._build_volumes(),
                workflow_metadata=self.workflow_metadata,
                workflow_template_ref=self.workflow_template_ref,
            ),
            status=self.status,
        )

    def to_dict(self) -> Any:
        """Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary."""
        return self.build().dict(exclude_none=True, by_alias=True)

    def to_yaml(self, *args, **kwargs) -> str:
        """Builds the Workflow as an Argo schema Workflow object and returns it as yaml string."""
        if not _yaml:
            raise ImportError("PyYAML is not installed")
        # Set some default options if not provided by the user
        kwargs.setdefault("default_flow_style", False)
        kwargs.setdefault("sort_keys", False)
        return _yaml.dump(self.to_dict(), *args, **kwargs)

    def create(self) -> TWorkflow:
        """Creates the Workflow on the Argo cluster."""
        assert self.workflows_service, "workflow service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.create_workflow(self.namespace, WorkflowCreateRequest(workflow=self.build()))

    def lint(self) -> TWorkflow:
        """Lints the Workflow using the Argo cluster."""
        assert self.workflows_service, "workflow service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.lint_workflow(self.namespace, WorkflowLintRequest(workflow=self.build()))

    def _add_sub(self, node: Any):
        """Adds any objects instantiated under the Workflow context manager that conform to the `Templatable` protocol
        or are Argo schema Template objects to the Workflow's list of templates.
        """
        if not isinstance(node, (Templatable, *get_args(Template))):
            raise InvalidType(type(node))
        self.templates.append(node)

    def to_file(self, output_directory: Union[Path, str] = ".", name: str = "", *args, **kwargs) -> Path:
        """Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

        Args:
            output_directory: The directory to write the file to. Defaults to the current working directory.
            name: The name of the file to write without the file extension. Defaults to the Workflow's name or a generated name.
            *args: Additional arguments to pass to `yaml.dump`.
            **kwargs: Additional keyword arguments to pass to `yaml.dump`.
        """
        workflow_name = self.name or (self.generate_name or "workflow").rstrip("-")
        name = name or workflow_name
        output_directory = Path(output_directory)
        output_path = Path(output_directory) / f"{name}.yaml"
        output_directory.mkdir(parents=True, exist_ok=True)
        output_path.write_text(self.to_yaml(*args, **kwargs))
        return output_path.absolute()

active_deadline_seconds class-attribute

active_deadline_seconds: Optional[int] = None

affinity class-attribute

affinity: Optional[Affinity] = None

annotations class-attribute

annotations: Optional[Dict[str, str]] = None

api_version class-attribute

api_version: Optional[str] = None

archive_logs class-attribute

archive_logs: Optional[bool] = None

artifact_gc class-attribute

artifact_gc: Optional[ArtifactGC] = None

artifact_repository_ref class-attribute

artifact_repository_ref: Optional[ArtifactRepositoryRef] = None

automount_service_account_token class-attribute

automount_service_account_token: Optional[bool] = None

cluster_name class-attribute

cluster_name: Optional[str] = None

creation_timestamp class-attribute

creation_timestamp: Optional[Time] = None

deletion_grace_period_seconds class-attribute

deletion_grace_period_seconds: Optional[int] = None

deletion_timestamp class-attribute

deletion_timestamp: Optional[Time] = None

dns_config class-attribute

dns_config: Optional[PodDNSConfig] = None

dns_policy class-attribute

dns_policy: Optional[str] = None

entrypoint class-attribute

entrypoint: Optional[str] = None

executor class-attribute

executor: Optional[ExecutorConfig] = None

finalizers class-attribute

finalizers: Optional[List[str]] = None

generate_name class-attribute

generate_name: Optional[str] = None

generation class-attribute

generation: Optional[int] = None

hooks class-attribute

hooks: Optional[Dict[str, LifecycleHook]] = None

host_aliases class-attribute

host_aliases: Optional[List[HostAlias]] = None

host_network class-attribute

host_network: Optional[bool] = None

image_pull_secrets class-attribute

image_pull_secrets: ImagePullSecrets = None

kind class-attribute

kind: Optional[str] = None

labels class-attribute

labels: Optional[Dict[str, str]] = None

managed_fields class-attribute

managed_fields: Optional[List[ManagedFieldsEntry]] = None

metrics class-attribute

metrics: Optional[Metrics] = None

name class-attribute

name: Optional[str] = None

namespace class-attribute

namespace: Optional[str] = None

node_selector class-attribute

node_selector: Optional[Dict[str, str]] = None

on_exit class-attribute

on_exit: Optional[str] = None

owner_references class-attribute

owner_references: Optional[List[OwnerReference]] = None

parallelism class-attribute

parallelism: Optional[int] = None

pod_disruption_budget class-attribute

pod_disruption_budget: Optional[PodDisruptionBudgetSpec] = None

pod_gc class-attribute

pod_gc: Optional[PodGC] = None

pod_metadata class-attribute

pod_metadata: Optional[Metadata] = None

pod_priority class-attribute

pod_priority: Optional[int] = None

pod_priority_class_name class-attribute

pod_priority_class_name: Optional[str] = None

pod_spec_patch class-attribute

pod_spec_patch: Optional[str] = None

priority class-attribute

priority: Optional[int] = None

resource_version class-attribute

resource_version: Optional[str] = None

retry_strategy class-attribute

retry_strategy: Optional[RetryStrategy] = None

scheduler_name class-attribute

scheduler_name: Optional[str] = None

security_context class-attribute

security_context: Optional[PodSecurityContext] = None
self_link: Optional[str] = None

service_account_name class-attribute

service_account_name: Optional[str] = None

shutdown class-attribute

shutdown: Optional[str] = None

status class-attribute

status: Optional[WorkflowStatus] = None

suspend class-attribute

suspend: Optional[bool] = None

synchronization class-attribute

synchronization: Optional[Synchronization] = None

template_defaults class-attribute

template_defaults: Optional[Template] = None

templates class-attribute

templates: List[Union[Template, Templatable]] = []

tolerations class-attribute

tolerations: Optional[List[Toleration]] = None

ttl_strategy class-attribute

ttl_strategy: Optional[TTLStrategy] = None

uid class-attribute

uid: Optional[str] = None

volume_claim_gc class-attribute

volume_claim_gc: Optional[VolumeClaimGC] = None

volume_claim_templates class-attribute

volume_claim_templates: Optional[List[PersistentVolumeClaim]] = None

workflow_metadata class-attribute

workflow_metadata: Optional[WorkflowMetadata] = None

workflow_template_ref class-attribute

workflow_template_ref: Optional[WorkflowTemplateRef] = None

workflows_service class-attribute

workflows_service: Optional[WorkflowsService] = None

build

build()

Builds the Workflow and its components into an Argo schema Workflow object.

Source code in hera/workflows/workflow.py
def build(self) -> TWorkflow:
    """Builds the Workflow and its components into an Argo schema Workflow object."""
    self = self._dispatch_hooks()

    templates = []
    for template in self.templates:
        if isinstance(template, HookMixin):
            template = template._dispatch_hooks()

        if isinstance(template, Templatable):
            templates.append(template._build_template())
        elif isinstance(template, get_args(TTemplate)):
            templates.append(template)
        else:
            raise InvalidType(f"{type(template)} is not a valid template type")

        if isinstance(template, VolumeClaimable):
            claims = template._build_persistent_volume_claims()
            # If there are no claims, continue, nothing to add
            if not claims:
                continue
            # If there are no volume claim templates, set them to the constructed claims
            elif self.volume_claim_templates is None:
                self.volume_claim_templates = claims
            else:
                # otherwise, we need to merge the two lists of volume claim templates. This prioritizes the
                # already existing volume claim templates under the assumption that the user has already set
                # a claim template on the workflow intentionally, or the user is sharing the same volumes across
                # different templates
                current_volume_claims_map = {}
                for claim in self.volume_claim_templates:
                    assert claim.metadata is not None, "expected a workflow volume claim with metadata"
                    assert claim.metadata.name is not None, "expected a named workflow volume claim"
                    current_volume_claims_map[claim.metadata.name] = claim

                new_volume_claims_map = {}
                for claim in claims:
                    assert claim.metadata is not None, "expected a volume claim with metadata"
                    assert claim.metadata.name is not None, "expected a named volume claim"
                    new_volume_claims_map[claim.metadata.name] = claim

                for claim_name, claim in new_volume_claims_map.items():
                    if claim_name not in current_volume_claims_map:
                        self.volume_claim_templates.append(claim)

    workflow_claims = self._build_persistent_volume_claims()
    volume_claim_templates = (self.volume_claim_templates or []) + (workflow_claims or [])
    return _ModelWorkflow(
        api_version=self.api_version,
        kind=self.kind,
        metadata=ObjectMeta(
            annotations=self.annotations,
            cluster_name=self.cluster_name,
            creation_timestamp=self.creation_timestamp,
            deletion_grace_period_seconds=self.deletion_grace_period_seconds,
            deletion_timestamp=self.deletion_timestamp,
            finalizers=self.finalizers,
            generate_name=self.generate_name,
            generation=self.generation,
            labels=self.labels,
            managed_fields=self.managed_fields,
            name=self.name,
            namespace=self.namespace,
            owner_references=self.owner_references,
            resource_version=self.resource_version,
            self_link=self.self_link,
            uid=self.uid,
        ),
        spec=_ModelWorkflowSpec(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_logs=self.archive_logs,
            arguments=self._build_arguments(),
            artifact_gc=self.artifact_gc,
            artifact_repository_ref=self.artifact_repository_ref,
            automount_service_account_token=self.automount_service_account_token,
            dns_config=self.dns_config,
            dns_policy=self.dns_policy,
            entrypoint=self.entrypoint,
            executor=self.executor,
            hooks=self.hooks,
            host_aliases=self.host_aliases,
            host_network=self.host_network,
            image_pull_secrets=self.image_pull_secrets,
            metrics=self.metrics,
            node_selector=self.node_selector,
            on_exit=self.on_exit,
            parallelism=self.parallelism,
            pod_disruption_budget=self.pod_disruption_budget,
            pod_gc=self.pod_gc,
            pod_metadata=self.pod_metadata,
            pod_priority=self.pod_priority,
            pod_priority_class_name=self.pod_priority_class_name,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.security_context,
            service_account_name=self.service_account_name,
            shutdown=self.shutdown,
            suspend=self.suspend,
            synchronization=self.synchronization,
            template_defaults=self.template_defaults,
            templates=templates or None,
            tolerations=self.tolerations,
            ttl_strategy=self.ttl_strategy,
            volume_claim_gc=self.volume_claim_gc,
            volume_claim_templates=volume_claim_templates or None,
            volumes=self._build_volumes(),
            workflow_metadata=self.workflow_metadata,
            workflow_template_ref=self.workflow_template_ref,
        ),
        status=self.status,
    )

create

create()

Creates the Workflow on the Argo cluster.

Source code in hera/workflows/workflow.py
def create(self) -> TWorkflow:
    """Creates the Workflow on the Argo cluster."""
    assert self.workflows_service, "workflow service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.create_workflow(self.namespace, WorkflowCreateRequest(workflow=self.build()))

lint

lint()

Lints the Workflow using the Argo cluster.

Source code in hera/workflows/workflow.py
def lint(self) -> TWorkflow:
    """Lints the Workflow using the Argo cluster."""
    assert self.workflows_service, "workflow service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.lint_workflow(self.namespace, WorkflowLintRequest(workflow=self.build()))

to_dict

to_dict()

Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary.

Source code in hera/workflows/workflow.py
def to_dict(self) -> Any:
    """Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary."""
    return self.build().dict(exclude_none=True, by_alias=True)

to_file

to_file(output_directory='.', name='', *args, **kwargs)

Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

Parameters:

Name Type Description Default
output_directory Union[Path, str]

The directory to write the file to. Defaults to the current working directory.

'.'
name str

The name of the file to write without the file extension. Defaults to the Workflow’s name or a generated name.

''
*args

Additional arguments to pass to yaml.dump.

()
**kwargs

Additional keyword arguments to pass to yaml.dump.

{}
Source code in hera/workflows/workflow.py
def to_file(self, output_directory: Union[Path, str] = ".", name: str = "", *args, **kwargs) -> Path:
    """Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

    Args:
        output_directory: The directory to write the file to. Defaults to the current working directory.
        name: The name of the file to write without the file extension. Defaults to the Workflow's name or a generated name.
        *args: Additional arguments to pass to `yaml.dump`.
        **kwargs: Additional keyword arguments to pass to `yaml.dump`.
    """
    workflow_name = self.name or (self.generate_name or "workflow").rstrip("-")
    name = name or workflow_name
    output_directory = Path(output_directory)
    output_path = Path(output_directory) / f"{name}.yaml"
    output_directory.mkdir(parents=True, exist_ok=True)
    output_path.write_text(self.to_yaml(*args, **kwargs))
    return output_path.absolute()

to_yaml

to_yaml(*args, **kwargs)

Builds the Workflow as an Argo schema Workflow object and returns it as yaml string.

Source code in hera/workflows/workflow.py
def to_yaml(self, *args, **kwargs) -> str:
    """Builds the Workflow as an Argo schema Workflow object and returns it as yaml string."""
    if not _yaml:
        raise ImportError("PyYAML is not installed")
    # Set some default options if not provided by the user
    kwargs.setdefault("default_flow_style", False)
    kwargs.setdefault("sort_keys", False)
    return _yaml.dump(self.to_dict(), *args, **kwargs)

WorkflowStatus

Placeholder for workflow statuses

Source code in hera/workflows/workflow_status.py
class WorkflowStatus(str, Enum):
    """Placeholder for workflow statuses"""

    running = "Running"
    succeeded = "Succeeded"
    failed = "Failed"
    error = "Error"
    terminated = "Terminated"

    def __str__(self):
        return str(self.value)

    @classmethod
    def from_argo_status(cls, s: str) -> "WorkflowStatus":
        """Turns an Argo status into a Hera workflow status representation"""
        switch = {
            "Running": WorkflowStatus.running,
            "Succeeded": WorkflowStatus.succeeded,
            "Failed": WorkflowStatus.failed,
            "Error": WorkflowStatus.error,
            "Terminated": WorkflowStatus.terminated,
        }

        ss = switch.get(s)
        if not ss:
            raise KeyError(f"Unrecognized status {s}. " f"Available Argo statuses are: {list(switch.keys())}")
        return ss

error class-attribute

error = 'Error'

failed class-attribute

failed = 'Failed'

running class-attribute

running = 'Running'

succeeded class-attribute

succeeded = 'Succeeded'

terminated class-attribute

terminated = 'Terminated'

from_argo_status classmethod

from_argo_status(s)

Turns an Argo status into a Hera workflow status representation

Source code in hera/workflows/workflow_status.py
@classmethod
def from_argo_status(cls, s: str) -> "WorkflowStatus":
    """Turns an Argo status into a Hera workflow status representation"""
    switch = {
        "Running": WorkflowStatus.running,
        "Succeeded": WorkflowStatus.succeeded,
        "Failed": WorkflowStatus.failed,
        "Error": WorkflowStatus.error,
        "Terminated": WorkflowStatus.terminated,
    }

    ss = switch.get(s)
    if not ss:
        raise KeyError(f"Unrecognized status {s}. " f"Available Argo statuses are: {list(switch.keys())}")
    return ss

WorkflowTemplate

WorkflowTemplates are definitions of Workflows that live in your cluster. This allows you to create a library of frequently-used templates and reuse them by referencing them from your Workflows.

Source code in hera/workflows/workflow_template.py
class WorkflowTemplate(Workflow):
    """WorkflowTemplates are definitions of Workflows that live in your cluster. This allows you
    to create a library of frequently-used templates and reuse them by referencing them from your
    Workflows.
    """

    # WorkflowTemplate fields match Workflow exactly except for `status`, which WorkflowTemplate
    # does not have - https://argoproj.github.io/argo-workflows/fields/#workflowtemplate
    @validator("status", pre=True, always=True)
    def _set_status(cls, v):
        if v is not None:
            raise ValueError("status is not a valid field on a WorkflowTemplate")

    def create(self) -> TWorkflow:
        """Creates the WorkflowTemplate on the Argo cluster."""
        assert self.workflows_service, "workflow service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.create_workflow_template(
            self.namespace, WorkflowTemplateCreateRequest(workflow=self.build())
        )

    def lint(self) -> TWorkflow:
        """Lints the WorkflowTemplate using the Argo cluster."""
        assert self.workflows_service, "workflow service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.lint_workflow_template(
            self.namespace, WorkflowTemplateLintRequest(workflow=self.build())
        )

    def build(self) -> TWorkflow:
        """Builds the WorkflowTemplate and its components into an Argo schema WorkflowTemplate object."""
        self = self._dispatch_hooks()

        templates = []
        for template in self.templates:
            if isinstance(template, HookMixin):
                template = template._dispatch_hooks()

            if isinstance(template, Templatable):
                templates.append(template._build_template())
            elif isinstance(template, get_args(TTemplate)):
                templates.append(template)
            else:
                raise InvalidType(f"{type(template)} is not a valid template type")

        return _ModelWorkflowTemplate(
            api_version=self.api_version,
            kind=self.kind,
            metadata=ObjectMeta(
                annotations=self.annotations,
                cluster_name=self.cluster_name,
                creation_timestamp=self.creation_timestamp,
                deletion_grace_period_seconds=self.deletion_grace_period_seconds,
                deletion_timestamp=self.deletion_timestamp,
                finalizers=self.finalizers,
                generate_name=self.generate_name,
                generation=self.generation,
                labels=self.labels,
                managed_fields=self.managed_fields,
                name=self.name,
                namespace=self.namespace,
                owner_references=self.owner_references,
                resource_version=self.resource_version,
                self_link=self.self_link,
                uid=self.uid,
            ),
            spec=_ModelWorkflowSpec(
                active_deadline_seconds=self.active_deadline_seconds,
                affinity=self.affinity,
                archive_logs=self.archive_logs,
                arguments=self._build_arguments(),
                artifact_gc=self.artifact_gc,
                artifact_repository_ref=self.artifact_repository_ref,
                automount_service_account_token=self.automount_service_account_token,
                dns_config=self.dns_config,
                dns_policy=self.dns_policy,
                entrypoint=self.entrypoint,
                executor=self.executor,
                hooks=self.hooks,
                host_aliases=self.host_aliases,
                host_network=self.host_network,
                image_pull_secrets=self.image_pull_secrets,
                metrics=self.metrics,
                node_selector=self.node_selector,
                on_exit=self.on_exit,
                parallelism=self.parallelism,
                pod_disruption_budget=self.pod_disruption_budget,
                pod_gc=self.pod_gc,
                pod_metadata=self.pod_metadata,
                pod_priority=self.pod_priority,
                pod_priority_class_name=self.pod_priority_class_name,
                pod_spec_patch=self.pod_spec_patch,
                priority=self.priority,
                retry_strategy=self.retry_strategy,
                scheduler_name=self.scheduler_name,
                security_context=self.security_context,
                service_account_name=self.service_account_name,
                shutdown=self.shutdown,
                suspend=self.suspend,
                synchronization=self.synchronization,
                template_defaults=self.template_defaults,
                templates=templates or None,
                tolerations=self.tolerations,
                ttl_strategy=self.ttl_strategy,
                volume_claim_gc=self.volume_claim_gc,
                volume_claim_templates=self.volume_claim_templates,
                volumes=self.volumes,
                workflow_metadata=self.workflow_metadata,
                workflow_template_ref=self.workflow_template_ref,
            ),
        )

build

build()

Builds the WorkflowTemplate and its components into an Argo schema WorkflowTemplate object.

Source code in hera/workflows/workflow_template.py
def build(self) -> TWorkflow:
    """Builds the WorkflowTemplate and its components into an Argo schema WorkflowTemplate object."""
    self = self._dispatch_hooks()

    templates = []
    for template in self.templates:
        if isinstance(template, HookMixin):
            template = template._dispatch_hooks()

        if isinstance(template, Templatable):
            templates.append(template._build_template())
        elif isinstance(template, get_args(TTemplate)):
            templates.append(template)
        else:
            raise InvalidType(f"{type(template)} is not a valid template type")

    return _ModelWorkflowTemplate(
        api_version=self.api_version,
        kind=self.kind,
        metadata=ObjectMeta(
            annotations=self.annotations,
            cluster_name=self.cluster_name,
            creation_timestamp=self.creation_timestamp,
            deletion_grace_period_seconds=self.deletion_grace_period_seconds,
            deletion_timestamp=self.deletion_timestamp,
            finalizers=self.finalizers,
            generate_name=self.generate_name,
            generation=self.generation,
            labels=self.labels,
            managed_fields=self.managed_fields,
            name=self.name,
            namespace=self.namespace,
            owner_references=self.owner_references,
            resource_version=self.resource_version,
            self_link=self.self_link,
            uid=self.uid,
        ),
        spec=_ModelWorkflowSpec(
            active_deadline_seconds=self.active_deadline_seconds,
            affinity=self.affinity,
            archive_logs=self.archive_logs,
            arguments=self._build_arguments(),
            artifact_gc=self.artifact_gc,
            artifact_repository_ref=self.artifact_repository_ref,
            automount_service_account_token=self.automount_service_account_token,
            dns_config=self.dns_config,
            dns_policy=self.dns_policy,
            entrypoint=self.entrypoint,
            executor=self.executor,
            hooks=self.hooks,
            host_aliases=self.host_aliases,
            host_network=self.host_network,
            image_pull_secrets=self.image_pull_secrets,
            metrics=self.metrics,
            node_selector=self.node_selector,
            on_exit=self.on_exit,
            parallelism=self.parallelism,
            pod_disruption_budget=self.pod_disruption_budget,
            pod_gc=self.pod_gc,
            pod_metadata=self.pod_metadata,
            pod_priority=self.pod_priority,
            pod_priority_class_name=self.pod_priority_class_name,
            pod_spec_patch=self.pod_spec_patch,
            priority=self.priority,
            retry_strategy=self.retry_strategy,
            scheduler_name=self.scheduler_name,
            security_context=self.security_context,
            service_account_name=self.service_account_name,
            shutdown=self.shutdown,
            suspend=self.suspend,
            synchronization=self.synchronization,
            template_defaults=self.template_defaults,
            templates=templates or None,
            tolerations=self.tolerations,
            ttl_strategy=self.ttl_strategy,
            volume_claim_gc=self.volume_claim_gc,
            volume_claim_templates=self.volume_claim_templates,
            volumes=self.volumes,
            workflow_metadata=self.workflow_metadata,
            workflow_template_ref=self.workflow_template_ref,
        ),
    )

create

create()

Creates the WorkflowTemplate on the Argo cluster.

Source code in hera/workflows/workflow_template.py
def create(self) -> TWorkflow:
    """Creates the WorkflowTemplate on the Argo cluster."""
    assert self.workflows_service, "workflow service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.create_workflow_template(
        self.namespace, WorkflowTemplateCreateRequest(workflow=self.build())
    )

lint

lint()

Lints the WorkflowTemplate using the Argo cluster.

Source code in hera/workflows/workflow_template.py
def lint(self) -> TWorkflow:
    """Lints the WorkflowTemplate using the Argo cluster."""
    assert self.workflows_service, "workflow service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.lint_workflow_template(
        self.namespace, WorkflowTemplateLintRequest(workflow=self.build())
    )

WorkflowsService

Source code in hera/workflows/service.py
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
class WorkflowsService:
    def __init__(
        self,
        host: Optional[str] = None,
        verify_ssl: Optional[bool] = None,
        token: Optional[str] = None,
        namespace: Optional[str] = None,
    ):
        self.host = cast(str, host or global_config.host)
        self.verify_ssl = verify_ssl or global_config.verify_ssl
        self.token = token or global_config.token
        self.namespace = namespace or global_config.namespace

    def list_archived_workflows(
        self,
        label_selector: Optional[str] = None,
        field_selector: Optional[str] = None,
        watch: Optional[bool] = None,
        allow_watch_bookmarks: Optional[bool] = None,
        resource_version: Optional[str] = None,
        resource_version_match: Optional[str] = None,
        timeout_seconds: Optional[str] = None,
        limit: Optional[str] = None,
        continue_: Optional[str] = None,
        name_prefix: Optional[str] = None,
    ) -> WorkflowList:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/archived-workflows"),
            params={
                "listOptions.labelSelector": label_selector,
                "listOptions.fieldSelector": field_selector,
                "listOptions.watch": watch,
                "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
                "listOptions.resourceVersion": resource_version,
                "listOptions.resourceVersionMatch": resource_version_match,
                "listOptions.timeoutSeconds": timeout_seconds,
                "listOptions.limit": limit,
                "listOptions.continue": continue_,
                "namePrefix": name_prefix,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowList(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def list_archived_workflow_label_keys(self) -> LabelKeys:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/archived-workflows-label-keys"),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return LabelKeys(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def list_archived_workflow_label_values(
        self,
        label_selector: Optional[str] = None,
        field_selector: Optional[str] = None,
        watch: Optional[bool] = None,
        allow_watch_bookmarks: Optional[bool] = None,
        resource_version: Optional[str] = None,
        resource_version_match: Optional[str] = None,
        timeout_seconds: Optional[str] = None,
        limit: Optional[str] = None,
        continue_: Optional[str] = None,
    ) -> LabelValues:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/archived-workflows-label-values"),
            params={
                "listOptions.labelSelector": label_selector,
                "listOptions.fieldSelector": field_selector,
                "listOptions.watch": watch,
                "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
                "listOptions.resourceVersion": resource_version,
                "listOptions.resourceVersionMatch": resource_version_match,
                "listOptions.timeoutSeconds": timeout_seconds,
                "listOptions.limit": limit,
                "listOptions.continue": continue_,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return LabelValues(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_archived_workflow(self, uid: str) -> Workflow:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/archived-workflows/{uid}").format(uid=uid),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse:
        resp = requests.delete(
            url=os.path.join(self.host, "api/v1/archived-workflows/{uid}").format(uid=uid),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return ArchivedWorkflowDeletedResponse()
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/archived-workflows/{uid}/resubmit").format(uid=uid),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/archived-workflows/{uid}/retry").format(uid=uid),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def list_cluster_workflow_templates(
        self,
        label_selector: Optional[str] = None,
        field_selector: Optional[str] = None,
        watch: Optional[bool] = None,
        allow_watch_bookmarks: Optional[bool] = None,
        resource_version: Optional[str] = None,
        resource_version_match: Optional[str] = None,
        timeout_seconds: Optional[str] = None,
        limit: Optional[str] = None,
        continue_: Optional[str] = None,
    ) -> ClusterWorkflowTemplateList:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/cluster-workflow-templates"),
            params={
                "listOptions.labelSelector": label_selector,
                "listOptions.fieldSelector": field_selector,
                "listOptions.watch": watch,
                "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
                "listOptions.resourceVersion": resource_version,
                "listOptions.resourceVersionMatch": resource_version_match,
                "listOptions.timeoutSeconds": timeout_seconds,
                "listOptions.limit": limit,
                "listOptions.continue": continue_,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return ClusterWorkflowTemplateList(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateRequest) -> ClusterWorkflowTemplate:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/cluster-workflow-templates"),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return ClusterWorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest) -> ClusterWorkflowTemplate:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/cluster-workflow-templates/lint"),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return ClusterWorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_cluster_workflow_template(
        self, name: str, resource_version: Optional[str] = None
    ) -> ClusterWorkflowTemplate:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name),
            params={"getOptions.resourceVersion": resource_version},
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return ClusterWorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def update_cluster_workflow_template(
        self, name: str, req: ClusterWorkflowTemplateUpdateRequest
    ) -> ClusterWorkflowTemplate:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return ClusterWorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def delete_cluster_workflow_template(
        self,
        name: str,
        grace_period_seconds: Optional[str] = None,
        uid: Optional[str] = None,
        resource_version: Optional[str] = None,
        orphan_dependents: Optional[bool] = None,
        propagation_policy: Optional[str] = None,
        dry_run: Optional[list] = None,
    ) -> ClusterWorkflowTemplateDeleteResponse:
        resp = requests.delete(
            url=os.path.join(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name),
            params={
                "deleteOptions.gracePeriodSeconds": grace_period_seconds,
                "deleteOptions.preconditions.uid": uid,
                "deleteOptions.preconditions.resourceVersion": resource_version,
                "deleteOptions.orphanDependents": orphan_dependents,
                "deleteOptions.propagationPolicy": propagation_policy,
                "deleteOptions.dryRun": dry_run,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return ClusterWorkflowTemplateDeleteResponse()
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def list_cron_workflows(
        self,
        namespace: str,
        label_selector: Optional[str] = None,
        field_selector: Optional[str] = None,
        watch: Optional[bool] = None,
        allow_watch_bookmarks: Optional[bool] = None,
        resource_version: Optional[str] = None,
        resource_version_match: Optional[str] = None,
        timeout_seconds: Optional[str] = None,
        limit: Optional[str] = None,
        continue_: Optional[str] = None,
    ) -> CronWorkflowList:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}").format(namespace=namespace),
            params={
                "listOptions.labelSelector": label_selector,
                "listOptions.fieldSelector": field_selector,
                "listOptions.watch": watch,
                "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
                "listOptions.resourceVersion": resource_version,
                "listOptions.resourceVersionMatch": resource_version_match,
                "listOptions.timeoutSeconds": timeout_seconds,
                "listOptions.limit": limit,
                "listOptions.continue": continue_,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflowList(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def create_cron_workflow(self, namespace: str, req: CreateCronWorkflowRequest) -> CronWorkflow:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}").format(namespace=namespace),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def lint_cron_workflow(self, namespace: str, req: LintCronWorkflowRequest) -> CronWorkflow:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/lint").format(namespace=namespace),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_cron_workflow(self, namespace: str, name: str, resource_version: Optional[str] = None) -> CronWorkflow:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}").format(
                namespace=namespace, name=name
            ),
            params={"getOptions.resourceVersion": resource_version},
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def update_cron_workflow(self, namespace: str, name: str, req: UpdateCronWorkflowRequest) -> CronWorkflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def delete_cron_workflow(
        self,
        namespace: str,
        name: str,
        grace_period_seconds: Optional[str] = None,
        uid: Optional[str] = None,
        resource_version: Optional[str] = None,
        orphan_dependents: Optional[bool] = None,
        propagation_policy: Optional[str] = None,
        dry_run: Optional[list] = None,
    ) -> CronWorkflowDeletedResponse:
        resp = requests.delete(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}").format(
                namespace=namespace, name=name
            ),
            params={
                "deleteOptions.gracePeriodSeconds": grace_period_seconds,
                "deleteOptions.preconditions.uid": uid,
                "deleteOptions.preconditions.resourceVersion": resource_version,
                "deleteOptions.orphanDependents": orphan_dependents,
                "deleteOptions.propagationPolicy": propagation_policy,
                "deleteOptions.dryRun": dry_run,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflowDeletedResponse()
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def resume_cron_workflow(self, namespace: str, name: str, req: CronWorkflowResumeRequest) -> CronWorkflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}/resume").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def suspend_cron_workflow(self, namespace: str, name: str, req: CronWorkflowSuspendRequest) -> CronWorkflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}/suspend").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return CronWorkflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_info(self) -> InfoResponse:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/info"),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return InfoResponse()
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_user_info(self) -> GetUserInfoResponse:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/userinfo"),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return GetUserInfoResponse()
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_version(self) -> Version:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/version"),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Version(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def list_workflow_templates(
        self,
        namespace: str,
        label_selector: Optional[str] = None,
        field_selector: Optional[str] = None,
        watch: Optional[bool] = None,
        allow_watch_bookmarks: Optional[bool] = None,
        resource_version: Optional[str] = None,
        resource_version_match: Optional[str] = None,
        timeout_seconds: Optional[str] = None,
        limit: Optional[str] = None,
        continue_: Optional[str] = None,
    ) -> WorkflowTemplateList:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}").format(namespace=namespace),
            params={
                "listOptions.labelSelector": label_selector,
                "listOptions.fieldSelector": field_selector,
                "listOptions.watch": watch,
                "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
                "listOptions.resourceVersion": resource_version,
                "listOptions.resourceVersionMatch": resource_version_match,
                "listOptions.timeoutSeconds": timeout_seconds,
                "listOptions.limit": limit,
                "listOptions.continue": continue_,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowTemplateList(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def create_workflow_template(self, namespace: str, req: WorkflowTemplateCreateRequest) -> WorkflowTemplate:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}").format(namespace=namespace),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def lint_workflow_template(self, namespace: str, req: WorkflowTemplateLintRequest) -> WorkflowTemplate:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/lint").format(namespace=namespace),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_workflow_template(
        self, namespace: str, name: str, resource_version: Optional[str] = None
    ) -> WorkflowTemplate:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/{name}").format(
                namespace=namespace, name=name
            ),
            params={"getOptions.resourceVersion": resource_version},
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def update_workflow_template(
        self, namespace: str, name: str, req: WorkflowTemplateUpdateRequest
    ) -> WorkflowTemplate:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/{name}").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowTemplate(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def delete_workflow_template(
        self,
        namespace: str,
        name: str,
        grace_period_seconds: Optional[str] = None,
        uid: Optional[str] = None,
        resource_version: Optional[str] = None,
        orphan_dependents: Optional[bool] = None,
        propagation_policy: Optional[str] = None,
        dry_run: Optional[list] = None,
    ) -> WorkflowTemplateDeleteResponse:
        resp = requests.delete(
            url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/{name}").format(
                namespace=namespace, name=name
            ),
            params={
                "deleteOptions.gracePeriodSeconds": grace_period_seconds,
                "deleteOptions.preconditions.uid": uid,
                "deleteOptions.preconditions.resourceVersion": resource_version,
                "deleteOptions.orphanDependents": orphan_dependents,
                "deleteOptions.propagationPolicy": propagation_policy,
                "deleteOptions.dryRun": dry_run,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowTemplateDeleteResponse()
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def list_workflows(
        self,
        namespace: str,
        label_selector: Optional[str] = None,
        field_selector: Optional[str] = None,
        watch: Optional[bool] = None,
        allow_watch_bookmarks: Optional[bool] = None,
        resource_version: Optional[str] = None,
        resource_version_match: Optional[str] = None,
        timeout_seconds: Optional[str] = None,
        limit: Optional[str] = None,
        continue_: Optional[str] = None,
        fields: Optional[str] = None,
    ) -> WorkflowList:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}").format(namespace=namespace),
            params={
                "listOptions.labelSelector": label_selector,
                "listOptions.fieldSelector": field_selector,
                "listOptions.watch": watch,
                "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
                "listOptions.resourceVersion": resource_version,
                "listOptions.resourceVersionMatch": resource_version_match,
                "listOptions.timeoutSeconds": timeout_seconds,
                "listOptions.limit": limit,
                "listOptions.continue": continue_,
                "fields": fields,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowList(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def create_workflow(self, namespace: str, req: WorkflowCreateRequest) -> Workflow:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}").format(namespace=namespace),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def lint_workflow(self, namespace: str, req: WorkflowLintRequest) -> Workflow:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/lint").format(namespace=namespace),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def submit_workflow(self, namespace: str, req: WorkflowSubmitRequest) -> Workflow:
        resp = requests.post(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/submit").format(namespace=namespace),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_workflow(
        self, namespace: str, name: str, resource_version: Optional[str] = None, fields: Optional[str] = None
    ) -> Workflow:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}").format(namespace=namespace, name=name),
            params={"getOptions.resourceVersion": resource_version, "fields": fields},
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def delete_workflow(
        self,
        namespace: str,
        name: str,
        grace_period_seconds: Optional[str] = None,
        uid: Optional[str] = None,
        resource_version: Optional[str] = None,
        orphan_dependents: Optional[bool] = None,
        propagation_policy: Optional[str] = None,
        dry_run: Optional[list] = None,
        force: Optional[bool] = None,
    ) -> WorkflowDeleteResponse:
        resp = requests.delete(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}").format(namespace=namespace, name=name),
            params={
                "deleteOptions.gracePeriodSeconds": grace_period_seconds,
                "deleteOptions.preconditions.uid": uid,
                "deleteOptions.preconditions.resourceVersion": resource_version,
                "deleteOptions.orphanDependents": orphan_dependents,
                "deleteOptions.propagationPolicy": propagation_policy,
                "deleteOptions.dryRun": dry_run,
                "force": force,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return WorkflowDeleteResponse()
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def workflow_logs(
        self,
        namespace: str,
        name: str,
        pod_name: Optional[str] = None,
        container: Optional[str] = None,
        follow: Optional[bool] = None,
        previous: Optional[bool] = None,
        since_seconds: Optional[str] = None,
        seconds: Optional[str] = None,
        nanos: Optional[int] = None,
        timestamps: Optional[bool] = None,
        tail_lines: Optional[str] = None,
        limit_bytes: Optional[str] = None,
        insecure_skip_tls_verify_backend: Optional[bool] = None,
        grep: Optional[str] = None,
        selector: Optional[str] = None,
    ) -> V1alpha1LogEntry:
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/log").format(
                namespace=namespace, name=name
            ),
            params={
                "podName": pod_name,
                "logOptions.container": container,
                "logOptions.follow": follow,
                "logOptions.previous": previous,
                "logOptions.sinceSeconds": since_seconds,
                "logOptions.sinceTime.seconds": seconds,
                "logOptions.sinceTime.nanos": nanos,
                "logOptions.timestamps": timestamps,
                "logOptions.tailLines": tail_lines,
                "logOptions.limitBytes": limit_bytes,
                "logOptions.insecureSkipTLSVerifyBackend": insecure_skip_tls_verify_backend,
                "grep": grep,
                "selector": selector,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return V1alpha1LogEntry(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def resubmit_workflow(self, namespace: str, name: str, req: WorkflowResubmitRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/resubmit").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def resume_workflow(self, namespace: str, name: str, req: WorkflowResumeRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/resume").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def retry_workflow(self, namespace: str, name: str, req: WorkflowRetryRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/retry").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def set_workflow(self, namespace: str, name: str, req: WorkflowSetRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/set").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def stop_workflow(self, namespace: str, name: str, req: WorkflowStopRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/stop").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def suspend_workflow(self, namespace: str, name: str, req: WorkflowSuspendRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/suspend").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def terminate_workflow(self, namespace: str, name: str, req: WorkflowTerminateRequest) -> Workflow:
        resp = requests.put(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/terminate").format(
                namespace=namespace, name=name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=req.json(
                exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
            ),
            verify=self.verify_ssl,
        )

        if resp.ok:
            return Workflow(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def pod_logs(
        self,
        namespace: str,
        name: str,
        pod_name: str,
        container: Optional[str] = None,
        follow: Optional[bool] = None,
        previous: Optional[bool] = None,
        since_seconds: Optional[str] = None,
        seconds: Optional[str] = None,
        nanos: Optional[int] = None,
        timestamps: Optional[bool] = None,
        tail_lines: Optional[str] = None,
        limit_bytes: Optional[str] = None,
        insecure_skip_tls_verify_backend: Optional[bool] = None,
        grep: Optional[str] = None,
        selector: Optional[str] = None,
    ) -> V1alpha1LogEntry:
        """DEPRECATED: Cannot work via HTTP if podName is an empty string. Use WorkflowLogs."""
        resp = requests.get(
            url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/{podName}/log").format(
                namespace=namespace, name=name, podName=pod_name
            ),
            params={
                "logOptions.container": container,
                "logOptions.follow": follow,
                "logOptions.previous": previous,
                "logOptions.sinceSeconds": since_seconds,
                "logOptions.sinceTime.seconds": seconds,
                "logOptions.sinceTime.nanos": nanos,
                "logOptions.timestamps": timestamps,
                "logOptions.tailLines": tail_lines,
                "logOptions.limitBytes": limit_bytes,
                "logOptions.insecureSkipTLSVerifyBackend": insecure_skip_tls_verify_backend,
                "grep": grep,
                "selector": selector,
            },
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return V1alpha1LogEntry(**resp.json())
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_artifact_file(
        self,
        namespace: str,
        id_discriminator: str,
        id_: str,
        node_id: str,
        artifact_name: str,
        artifact_discriminator: str,
    ) -> str:
        """Get an artifact."""
        resp = requests.get(
            url=os.path.join(
                self.host,
                "artifact-files/{namespace}/{idDiscriminator}/{id}/{nodeId}/{artifactDiscriminator}/{artifactName}",
            ).format(
                namespace=namespace,
                idDiscriminator=id_discriminator,
                id=id_,
                nodeId=node_id,
                artifactName=artifact_name,
                artifactDiscriminator=artifact_discriminator,
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return str(resp.content)
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str:
        """Get an output artifact by UID."""
        resp = requests.get(
            url=os.path.join(self.host, "artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format(
                uid=uid, nodeId=node_id, artifactName=artifact_name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return str(resp.content)
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_output_artifact(self, namespace: str, name: str, node_id: str, artifact_name: str) -> str:
        """Get an output artifact."""
        resp = requests.get(
            url=os.path.join(self.host, "artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format(
                namespace=namespace, name=name, nodeId=node_id, artifactName=artifact_name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return str(resp.content)
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str:
        """Get an input artifact by UID."""
        resp = requests.get(
            url=os.path.join(self.host, "input-artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format(
                uid=uid, nodeId=node_id, artifactName=artifact_name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return str(resp.content)
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

    def get_input_artifact(self, namespace: str, name: str, node_id: str, artifact_name: str) -> str:
        """Get an input artifact."""
        resp = requests.get(
            url=os.path.join(self.host, "input-artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format(
                namespace=namespace, name=name, nodeId=node_id, artifactName=artifact_name
            ),
            params=None,
            headers={"Authorization": f"Bearer {self.token}"},
            data=None,
            verify=self.verify_ssl,
        )

        if resp.ok:
            return str(resp.content)
        else:
            raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

host instance-attribute

host = cast(str, host or global_config.host)

namespace instance-attribute

namespace = namespace or global_config.namespace

token instance-attribute

token = token or global_config.token

verify_ssl instance-attribute

verify_ssl = verify_ssl or global_config.verify_ssl

create_cluster_workflow_template

create_cluster_workflow_template(req)
Source code in hera/workflows/service.py
def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateRequest) -> ClusterWorkflowTemplate:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/cluster-workflow-templates"),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return ClusterWorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

create_cron_workflow

create_cron_workflow(namespace, req)
Source code in hera/workflows/service.py
def create_cron_workflow(self, namespace: str, req: CreateCronWorkflowRequest) -> CronWorkflow:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}").format(namespace=namespace),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

create_workflow

create_workflow(namespace, req)
Source code in hera/workflows/service.py
def create_workflow(self, namespace: str, req: WorkflowCreateRequest) -> Workflow:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}").format(namespace=namespace),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

create_workflow_template

create_workflow_template(namespace, req)
Source code in hera/workflows/service.py
def create_workflow_template(self, namespace: str, req: WorkflowTemplateCreateRequest) -> WorkflowTemplate:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}").format(namespace=namespace),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

delete_archived_workflow

delete_archived_workflow(uid)
Source code in hera/workflows/service.py
def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse:
    resp = requests.delete(
        url=os.path.join(self.host, "api/v1/archived-workflows/{uid}").format(uid=uid),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return ArchivedWorkflowDeletedResponse()
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

delete_cluster_workflow_template

delete_cluster_workflow_template(name, grace_period_seconds=None, uid=None, resource_version=None, orphan_dependents=None, propagation_policy=None, dry_run=None)
Source code in hera/workflows/service.py
def delete_cluster_workflow_template(
    self,
    name: str,
    grace_period_seconds: Optional[str] = None,
    uid: Optional[str] = None,
    resource_version: Optional[str] = None,
    orphan_dependents: Optional[bool] = None,
    propagation_policy: Optional[str] = None,
    dry_run: Optional[list] = None,
) -> ClusterWorkflowTemplateDeleteResponse:
    resp = requests.delete(
        url=os.path.join(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name),
        params={
            "deleteOptions.gracePeriodSeconds": grace_period_seconds,
            "deleteOptions.preconditions.uid": uid,
            "deleteOptions.preconditions.resourceVersion": resource_version,
            "deleteOptions.orphanDependents": orphan_dependents,
            "deleteOptions.propagationPolicy": propagation_policy,
            "deleteOptions.dryRun": dry_run,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return ClusterWorkflowTemplateDeleteResponse()
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

delete_cron_workflow

delete_cron_workflow(namespace, name, grace_period_seconds=None, uid=None, resource_version=None, orphan_dependents=None, propagation_policy=None, dry_run=None)
Source code in hera/workflows/service.py
def delete_cron_workflow(
    self,
    namespace: str,
    name: str,
    grace_period_seconds: Optional[str] = None,
    uid: Optional[str] = None,
    resource_version: Optional[str] = None,
    orphan_dependents: Optional[bool] = None,
    propagation_policy: Optional[str] = None,
    dry_run: Optional[list] = None,
) -> CronWorkflowDeletedResponse:
    resp = requests.delete(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}").format(
            namespace=namespace, name=name
        ),
        params={
            "deleteOptions.gracePeriodSeconds": grace_period_seconds,
            "deleteOptions.preconditions.uid": uid,
            "deleteOptions.preconditions.resourceVersion": resource_version,
            "deleteOptions.orphanDependents": orphan_dependents,
            "deleteOptions.propagationPolicy": propagation_policy,
            "deleteOptions.dryRun": dry_run,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflowDeletedResponse()
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

delete_workflow

delete_workflow(namespace, name, grace_period_seconds=None, uid=None, resource_version=None, orphan_dependents=None, propagation_policy=None, dry_run=None, force=None)
Source code in hera/workflows/service.py
def delete_workflow(
    self,
    namespace: str,
    name: str,
    grace_period_seconds: Optional[str] = None,
    uid: Optional[str] = None,
    resource_version: Optional[str] = None,
    orphan_dependents: Optional[bool] = None,
    propagation_policy: Optional[str] = None,
    dry_run: Optional[list] = None,
    force: Optional[bool] = None,
) -> WorkflowDeleteResponse:
    resp = requests.delete(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}").format(namespace=namespace, name=name),
        params={
            "deleteOptions.gracePeriodSeconds": grace_period_seconds,
            "deleteOptions.preconditions.uid": uid,
            "deleteOptions.preconditions.resourceVersion": resource_version,
            "deleteOptions.orphanDependents": orphan_dependents,
            "deleteOptions.propagationPolicy": propagation_policy,
            "deleteOptions.dryRun": dry_run,
            "force": force,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowDeleteResponse()
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

delete_workflow_template

delete_workflow_template(namespace, name, grace_period_seconds=None, uid=None, resource_version=None, orphan_dependents=None, propagation_policy=None, dry_run=None)
Source code in hera/workflows/service.py
def delete_workflow_template(
    self,
    namespace: str,
    name: str,
    grace_period_seconds: Optional[str] = None,
    uid: Optional[str] = None,
    resource_version: Optional[str] = None,
    orphan_dependents: Optional[bool] = None,
    propagation_policy: Optional[str] = None,
    dry_run: Optional[list] = None,
) -> WorkflowTemplateDeleteResponse:
    resp = requests.delete(
        url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/{name}").format(
            namespace=namespace, name=name
        ),
        params={
            "deleteOptions.gracePeriodSeconds": grace_period_seconds,
            "deleteOptions.preconditions.uid": uid,
            "deleteOptions.preconditions.resourceVersion": resource_version,
            "deleteOptions.orphanDependents": orphan_dependents,
            "deleteOptions.propagationPolicy": propagation_policy,
            "deleteOptions.dryRun": dry_run,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowTemplateDeleteResponse()
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_archived_workflow

get_archived_workflow(uid)
Source code in hera/workflows/service.py
def get_archived_workflow(self, uid: str) -> Workflow:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/archived-workflows/{uid}").format(uid=uid),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_artifact_file

get_artifact_file(namespace, id_discriminator, id_, node_id, artifact_name, artifact_discriminator)

Get an artifact.

Source code in hera/workflows/service.py
def get_artifact_file(
    self,
    namespace: str,
    id_discriminator: str,
    id_: str,
    node_id: str,
    artifact_name: str,
    artifact_discriminator: str,
) -> str:
    """Get an artifact."""
    resp = requests.get(
        url=os.path.join(
            self.host,
            "artifact-files/{namespace}/{idDiscriminator}/{id}/{nodeId}/{artifactDiscriminator}/{artifactName}",
        ).format(
            namespace=namespace,
            idDiscriminator=id_discriminator,
            id=id_,
            nodeId=node_id,
            artifactName=artifact_name,
            artifactDiscriminator=artifact_discriminator,
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return str(resp.content)
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_cluster_workflow_template

get_cluster_workflow_template(name, resource_version=None)
Source code in hera/workflows/service.py
def get_cluster_workflow_template(
    self, name: str, resource_version: Optional[str] = None
) -> ClusterWorkflowTemplate:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name),
        params={"getOptions.resourceVersion": resource_version},
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return ClusterWorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_cron_workflow

get_cron_workflow(namespace, name, resource_version=None)
Source code in hera/workflows/service.py
def get_cron_workflow(self, namespace: str, name: str, resource_version: Optional[str] = None) -> CronWorkflow:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}").format(
            namespace=namespace, name=name
        ),
        params={"getOptions.resourceVersion": resource_version},
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_info

get_info()
Source code in hera/workflows/service.py
def get_info(self) -> InfoResponse:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/info"),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return InfoResponse()
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_input_artifact

get_input_artifact(namespace, name, node_id, artifact_name)

Get an input artifact.

Source code in hera/workflows/service.py
def get_input_artifact(self, namespace: str, name: str, node_id: str, artifact_name: str) -> str:
    """Get an input artifact."""
    resp = requests.get(
        url=os.path.join(self.host, "input-artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format(
            namespace=namespace, name=name, nodeId=node_id, artifactName=artifact_name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return str(resp.content)
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_input_artifact_by_uid

get_input_artifact_by_uid(uid, node_id, artifact_name)

Get an input artifact by UID.

Source code in hera/workflows/service.py
def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str:
    """Get an input artifact by UID."""
    resp = requests.get(
        url=os.path.join(self.host, "input-artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format(
            uid=uid, nodeId=node_id, artifactName=artifact_name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return str(resp.content)
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_output_artifact

get_output_artifact(namespace, name, node_id, artifact_name)

Get an output artifact.

Source code in hera/workflows/service.py
def get_output_artifact(self, namespace: str, name: str, node_id: str, artifact_name: str) -> str:
    """Get an output artifact."""
    resp = requests.get(
        url=os.path.join(self.host, "artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format(
            namespace=namespace, name=name, nodeId=node_id, artifactName=artifact_name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return str(resp.content)
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_output_artifact_by_uid

get_output_artifact_by_uid(uid, node_id, artifact_name)

Get an output artifact by UID.

Source code in hera/workflows/service.py
def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str:
    """Get an output artifact by UID."""
    resp = requests.get(
        url=os.path.join(self.host, "artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format(
            uid=uid, nodeId=node_id, artifactName=artifact_name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return str(resp.content)
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_user_info

get_user_info()
Source code in hera/workflows/service.py
def get_user_info(self) -> GetUserInfoResponse:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/userinfo"),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return GetUserInfoResponse()
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_version

get_version()
Source code in hera/workflows/service.py
def get_version(self) -> Version:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/version"),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Version(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_workflow

get_workflow(namespace, name, resource_version=None, fields=None)
Source code in hera/workflows/service.py
def get_workflow(
    self, namespace: str, name: str, resource_version: Optional[str] = None, fields: Optional[str] = None
) -> Workflow:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}").format(namespace=namespace, name=name),
        params={"getOptions.resourceVersion": resource_version, "fields": fields},
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

get_workflow_template

get_workflow_template(namespace, name, resource_version=None)
Source code in hera/workflows/service.py
def get_workflow_template(
    self, namespace: str, name: str, resource_version: Optional[str] = None
) -> WorkflowTemplate:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/{name}").format(
            namespace=namespace, name=name
        ),
        params={"getOptions.resourceVersion": resource_version},
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

lint_cluster_workflow_template

lint_cluster_workflow_template(req)
Source code in hera/workflows/service.py
def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest) -> ClusterWorkflowTemplate:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/cluster-workflow-templates/lint"),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return ClusterWorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

lint_cron_workflow

lint_cron_workflow(namespace, req)
Source code in hera/workflows/service.py
def lint_cron_workflow(self, namespace: str, req: LintCronWorkflowRequest) -> CronWorkflow:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/lint").format(namespace=namespace),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

lint_workflow

lint_workflow(namespace, req)
Source code in hera/workflows/service.py
def lint_workflow(self, namespace: str, req: WorkflowLintRequest) -> Workflow:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/lint").format(namespace=namespace),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

lint_workflow_template

lint_workflow_template(namespace, req)
Source code in hera/workflows/service.py
def lint_workflow_template(self, namespace: str, req: WorkflowTemplateLintRequest) -> WorkflowTemplate:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/lint").format(namespace=namespace),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

list_archived_workflow_label_keys

list_archived_workflow_label_keys()
Source code in hera/workflows/service.py
def list_archived_workflow_label_keys(self) -> LabelKeys:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/archived-workflows-label-keys"),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return LabelKeys(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

list_archived_workflow_label_values

list_archived_workflow_label_values(label_selector=None, field_selector=None, watch=None, allow_watch_bookmarks=None, resource_version=None, resource_version_match=None, timeout_seconds=None, limit=None, continue_=None)
Source code in hera/workflows/service.py
def list_archived_workflow_label_values(
    self,
    label_selector: Optional[str] = None,
    field_selector: Optional[str] = None,
    watch: Optional[bool] = None,
    allow_watch_bookmarks: Optional[bool] = None,
    resource_version: Optional[str] = None,
    resource_version_match: Optional[str] = None,
    timeout_seconds: Optional[str] = None,
    limit: Optional[str] = None,
    continue_: Optional[str] = None,
) -> LabelValues:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/archived-workflows-label-values"),
        params={
            "listOptions.labelSelector": label_selector,
            "listOptions.fieldSelector": field_selector,
            "listOptions.watch": watch,
            "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
            "listOptions.resourceVersion": resource_version,
            "listOptions.resourceVersionMatch": resource_version_match,
            "listOptions.timeoutSeconds": timeout_seconds,
            "listOptions.limit": limit,
            "listOptions.continue": continue_,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return LabelValues(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

list_archived_workflows

list_archived_workflows(label_selector=None, field_selector=None, watch=None, allow_watch_bookmarks=None, resource_version=None, resource_version_match=None, timeout_seconds=None, limit=None, continue_=None, name_prefix=None)
Source code in hera/workflows/service.py
def list_archived_workflows(
    self,
    label_selector: Optional[str] = None,
    field_selector: Optional[str] = None,
    watch: Optional[bool] = None,
    allow_watch_bookmarks: Optional[bool] = None,
    resource_version: Optional[str] = None,
    resource_version_match: Optional[str] = None,
    timeout_seconds: Optional[str] = None,
    limit: Optional[str] = None,
    continue_: Optional[str] = None,
    name_prefix: Optional[str] = None,
) -> WorkflowList:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/archived-workflows"),
        params={
            "listOptions.labelSelector": label_selector,
            "listOptions.fieldSelector": field_selector,
            "listOptions.watch": watch,
            "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
            "listOptions.resourceVersion": resource_version,
            "listOptions.resourceVersionMatch": resource_version_match,
            "listOptions.timeoutSeconds": timeout_seconds,
            "listOptions.limit": limit,
            "listOptions.continue": continue_,
            "namePrefix": name_prefix,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowList(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

list_cluster_workflow_templates

list_cluster_workflow_templates(label_selector=None, field_selector=None, watch=None, allow_watch_bookmarks=None, resource_version=None, resource_version_match=None, timeout_seconds=None, limit=None, continue_=None)
Source code in hera/workflows/service.py
def list_cluster_workflow_templates(
    self,
    label_selector: Optional[str] = None,
    field_selector: Optional[str] = None,
    watch: Optional[bool] = None,
    allow_watch_bookmarks: Optional[bool] = None,
    resource_version: Optional[str] = None,
    resource_version_match: Optional[str] = None,
    timeout_seconds: Optional[str] = None,
    limit: Optional[str] = None,
    continue_: Optional[str] = None,
) -> ClusterWorkflowTemplateList:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/cluster-workflow-templates"),
        params={
            "listOptions.labelSelector": label_selector,
            "listOptions.fieldSelector": field_selector,
            "listOptions.watch": watch,
            "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
            "listOptions.resourceVersion": resource_version,
            "listOptions.resourceVersionMatch": resource_version_match,
            "listOptions.timeoutSeconds": timeout_seconds,
            "listOptions.limit": limit,
            "listOptions.continue": continue_,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return ClusterWorkflowTemplateList(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

list_cron_workflows

list_cron_workflows(namespace, label_selector=None, field_selector=None, watch=None, allow_watch_bookmarks=None, resource_version=None, resource_version_match=None, timeout_seconds=None, limit=None, continue_=None)
Source code in hera/workflows/service.py
def list_cron_workflows(
    self,
    namespace: str,
    label_selector: Optional[str] = None,
    field_selector: Optional[str] = None,
    watch: Optional[bool] = None,
    allow_watch_bookmarks: Optional[bool] = None,
    resource_version: Optional[str] = None,
    resource_version_match: Optional[str] = None,
    timeout_seconds: Optional[str] = None,
    limit: Optional[str] = None,
    continue_: Optional[str] = None,
) -> CronWorkflowList:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}").format(namespace=namespace),
        params={
            "listOptions.labelSelector": label_selector,
            "listOptions.fieldSelector": field_selector,
            "listOptions.watch": watch,
            "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
            "listOptions.resourceVersion": resource_version,
            "listOptions.resourceVersionMatch": resource_version_match,
            "listOptions.timeoutSeconds": timeout_seconds,
            "listOptions.limit": limit,
            "listOptions.continue": continue_,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflowList(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

list_workflow_templates

list_workflow_templates(namespace, label_selector=None, field_selector=None, watch=None, allow_watch_bookmarks=None, resource_version=None, resource_version_match=None, timeout_seconds=None, limit=None, continue_=None)
Source code in hera/workflows/service.py
def list_workflow_templates(
    self,
    namespace: str,
    label_selector: Optional[str] = None,
    field_selector: Optional[str] = None,
    watch: Optional[bool] = None,
    allow_watch_bookmarks: Optional[bool] = None,
    resource_version: Optional[str] = None,
    resource_version_match: Optional[str] = None,
    timeout_seconds: Optional[str] = None,
    limit: Optional[str] = None,
    continue_: Optional[str] = None,
) -> WorkflowTemplateList:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}").format(namespace=namespace),
        params={
            "listOptions.labelSelector": label_selector,
            "listOptions.fieldSelector": field_selector,
            "listOptions.watch": watch,
            "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
            "listOptions.resourceVersion": resource_version,
            "listOptions.resourceVersionMatch": resource_version_match,
            "listOptions.timeoutSeconds": timeout_seconds,
            "listOptions.limit": limit,
            "listOptions.continue": continue_,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowTemplateList(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

list_workflows

list_workflows(namespace, label_selector=None, field_selector=None, watch=None, allow_watch_bookmarks=None, resource_version=None, resource_version_match=None, timeout_seconds=None, limit=None, continue_=None, fields=None)
Source code in hera/workflows/service.py
def list_workflows(
    self,
    namespace: str,
    label_selector: Optional[str] = None,
    field_selector: Optional[str] = None,
    watch: Optional[bool] = None,
    allow_watch_bookmarks: Optional[bool] = None,
    resource_version: Optional[str] = None,
    resource_version_match: Optional[str] = None,
    timeout_seconds: Optional[str] = None,
    limit: Optional[str] = None,
    continue_: Optional[str] = None,
    fields: Optional[str] = None,
) -> WorkflowList:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}").format(namespace=namespace),
        params={
            "listOptions.labelSelector": label_selector,
            "listOptions.fieldSelector": field_selector,
            "listOptions.watch": watch,
            "listOptions.allowWatchBookmarks": allow_watch_bookmarks,
            "listOptions.resourceVersion": resource_version,
            "listOptions.resourceVersionMatch": resource_version_match,
            "listOptions.timeoutSeconds": timeout_seconds,
            "listOptions.limit": limit,
            "listOptions.continue": continue_,
            "fields": fields,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowList(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

pod_logs

pod_logs(namespace, name, pod_name, container=None, follow=None, previous=None, since_seconds=None, seconds=None, nanos=None, timestamps=None, tail_lines=None, limit_bytes=None, insecure_skip_tls_verify_backend=None, grep=None, selector=None)

DEPRECATED: Cannot work via HTTP if podName is an empty string. Use WorkflowLogs.

Source code in hera/workflows/service.py
def pod_logs(
    self,
    namespace: str,
    name: str,
    pod_name: str,
    container: Optional[str] = None,
    follow: Optional[bool] = None,
    previous: Optional[bool] = None,
    since_seconds: Optional[str] = None,
    seconds: Optional[str] = None,
    nanos: Optional[int] = None,
    timestamps: Optional[bool] = None,
    tail_lines: Optional[str] = None,
    limit_bytes: Optional[str] = None,
    insecure_skip_tls_verify_backend: Optional[bool] = None,
    grep: Optional[str] = None,
    selector: Optional[str] = None,
) -> V1alpha1LogEntry:
    """DEPRECATED: Cannot work via HTTP if podName is an empty string. Use WorkflowLogs."""
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/{podName}/log").format(
            namespace=namespace, name=name, podName=pod_name
        ),
        params={
            "logOptions.container": container,
            "logOptions.follow": follow,
            "logOptions.previous": previous,
            "logOptions.sinceSeconds": since_seconds,
            "logOptions.sinceTime.seconds": seconds,
            "logOptions.sinceTime.nanos": nanos,
            "logOptions.timestamps": timestamps,
            "logOptions.tailLines": tail_lines,
            "logOptions.limitBytes": limit_bytes,
            "logOptions.insecureSkipTLSVerifyBackend": insecure_skip_tls_verify_backend,
            "grep": grep,
            "selector": selector,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return V1alpha1LogEntry(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

resubmit_archived_workflow

resubmit_archived_workflow(uid, req)
Source code in hera/workflows/service.py
def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/archived-workflows/{uid}/resubmit").format(uid=uid),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

resubmit_workflow

resubmit_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def resubmit_workflow(self, namespace: str, name: str, req: WorkflowResubmitRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/resubmit").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

resume_cron_workflow

resume_cron_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def resume_cron_workflow(self, namespace: str, name: str, req: CronWorkflowResumeRequest) -> CronWorkflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}/resume").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

resume_workflow

resume_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def resume_workflow(self, namespace: str, name: str, req: WorkflowResumeRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/resume").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

retry_archived_workflow

retry_archived_workflow(uid, req)
Source code in hera/workflows/service.py
def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/archived-workflows/{uid}/retry").format(uid=uid),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

retry_workflow

retry_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def retry_workflow(self, namespace: str, name: str, req: WorkflowRetryRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/retry").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

set_workflow

set_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def set_workflow(self, namespace: str, name: str, req: WorkflowSetRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/set").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

stop_workflow

stop_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def stop_workflow(self, namespace: str, name: str, req: WorkflowStopRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/stop").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

submit_workflow

submit_workflow(namespace, req)
Source code in hera/workflows/service.py
def submit_workflow(self, namespace: str, req: WorkflowSubmitRequest) -> Workflow:
    resp = requests.post(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/submit").format(namespace=namespace),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

suspend_cron_workflow

suspend_cron_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def suspend_cron_workflow(self, namespace: str, name: str, req: CronWorkflowSuspendRequest) -> CronWorkflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}/suspend").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

suspend_workflow

suspend_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def suspend_workflow(self, namespace: str, name: str, req: WorkflowSuspendRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/suspend").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

terminate_workflow

terminate_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def terminate_workflow(self, namespace: str, name: str, req: WorkflowTerminateRequest) -> Workflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/terminate").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return Workflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

update_cluster_workflow_template

update_cluster_workflow_template(name, req)
Source code in hera/workflows/service.py
def update_cluster_workflow_template(
    self, name: str, req: ClusterWorkflowTemplateUpdateRequest
) -> ClusterWorkflowTemplate:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return ClusterWorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

update_cron_workflow

update_cron_workflow(namespace, name, req)
Source code in hera/workflows/service.py
def update_cron_workflow(self, namespace: str, name: str, req: UpdateCronWorkflowRequest) -> CronWorkflow:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/cron-workflows/{namespace}/{name}").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return CronWorkflow(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

update_workflow_template

update_workflow_template(namespace, name, req)
Source code in hera/workflows/service.py
def update_workflow_template(
    self, namespace: str, name: str, req: WorkflowTemplateUpdateRequest
) -> WorkflowTemplate:
    resp = requests.put(
        url=os.path.join(self.host, "api/v1/workflow-templates/{namespace}/{name}").format(
            namespace=namespace, name=name
        ),
        params=None,
        headers={"Authorization": f"Bearer {self.token}"},
        data=req.json(
            exclude_none=True, by_alias=True, skip_defaults=True, exclude_unset=True, exclude_defaults=True
        ),
        verify=self.verify_ssl,
    )

    if resp.ok:
        return WorkflowTemplate(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

workflow_logs

workflow_logs(namespace, name, pod_name=None, container=None, follow=None, previous=None, since_seconds=None, seconds=None, nanos=None, timestamps=None, tail_lines=None, limit_bytes=None, insecure_skip_tls_verify_backend=None, grep=None, selector=None)
Source code in hera/workflows/service.py
def workflow_logs(
    self,
    namespace: str,
    name: str,
    pod_name: Optional[str] = None,
    container: Optional[str] = None,
    follow: Optional[bool] = None,
    previous: Optional[bool] = None,
    since_seconds: Optional[str] = None,
    seconds: Optional[str] = None,
    nanos: Optional[int] = None,
    timestamps: Optional[bool] = None,
    tail_lines: Optional[str] = None,
    limit_bytes: Optional[str] = None,
    insecure_skip_tls_verify_backend: Optional[bool] = None,
    grep: Optional[str] = None,
    selector: Optional[str] = None,
) -> V1alpha1LogEntry:
    resp = requests.get(
        url=os.path.join(self.host, "api/v1/workflows/{namespace}/{name}/log").format(
            namespace=namespace, name=name
        ),
        params={
            "podName": pod_name,
            "logOptions.container": container,
            "logOptions.follow": follow,
            "logOptions.previous": previous,
            "logOptions.sinceSeconds": since_seconds,
            "logOptions.sinceTime.seconds": seconds,
            "logOptions.sinceTime.nanos": nanos,
            "logOptions.timestamps": timestamps,
            "logOptions.tailLines": tail_lines,
            "logOptions.limitBytes": limit_bytes,
            "logOptions.insecureSkipTLSVerifyBackend": insecure_skip_tls_verify_backend,
            "grep": grep,
            "selector": selector,
        },
        headers={"Authorization": f"Bearer {self.token}"},
        data=None,
        verify=self.verify_ssl,
    )

    if resp.ok:
        return V1alpha1LogEntry(**resp.json())
    else:
        raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}")

ZipArchiveStrategy

Source code in hera/workflows/archive.py
class ZipArchiveStrategy(ArchiveStrategy):
    def _build_archive_strategy(self) -> _ModelArchiveStrategy:
        return _ModelArchiveStrategy(zip=_ModelZipStrategy())

Comments