SOURCE CODE PYTEST_HELM_CHARTS.K8S.JOB DOCS

from typing import List, Optional

import pykube
from pykube import Job, HTTPClient

from pytest_helm_charts.utils import wait_for_objects_condition, inject_extra


def _job_complete(job: Job) -> bool:
    complete = (
        "status" in job.obj
        and "conditions" in job.obj["status"]
        and len(job.obj["status"]["conditions"]) > 0
        and job.obj["status"]["conditions"][0]["type"] == "Complete"
    )
    return complete


def wait_for_jobs_to_complete(DOCS
    kube_client: HTTPClient, job_names: List[str], jobs_namespace: str, timeout_sec: int, missing_ok: bool = True
) -> List[Job]:
    """
    Block until all the Jobs are complete or timeout is reached.

    Args:
        kube_client: client to use to connect to the k8s cluster
        job_names: a list of Job names to check
        jobs_namespace: namespace where all the Jobs are created (single namespace for all resources)
        timeout_sec: timeout for the call
        missing_ok: when `True`, the function ignores that some of the objects listed in the `job_names`
            don't exist in k8s API and waits for them to show up; when `False`, an
            [ObjectNotFound](pykube.exceptions.ObjectDoesNotExist) exception is raised.

    Returns:
        The list of Job resources with all the objects listed in `job_names` included.

    Raises:
        TimeoutError: when timeout is reached.
        pykube.exceptions.ObjectDoesNotExist: when `missing_ok == False` and one of the objects
            listed in `job_names` can't be found in k8s API

    """
    result = wait_for_objects_condition(
        kube_client, Job, job_names, jobs_namespace, _job_complete, timeout_sec, missing_ok
    )
    return result


def make_job_object(DOCS
    kube_client: pykube.HTTPClient,
    name_prefix: str,
    namespace: str,
    command: List[str],
    image: str = "quay.io/giantswarm/busybox:1.32.0",
    restart_policy: str = "OnFailure",
    backoff_limit: int = 6,  # 6 is the default from k8s
    extra_metadata: Optional[dict] = None,
    extra_spec: Optional[dict] = None,
) -> pykube.Job:
    """
    Creates a Job object according to the parameters.

    Args:
        kube_client: client to use to connect to the k8s cluster
        name_prefix: `metadata.generateName` Job's object property that is used to name Pods
        namespace: Namespace to create the Job in
        command: command to run inside the Job pod
        image: container image to use
        restart_policy: Job's restart policy (as in k8s API)
        backoff_limit: Job's backoff limit (as in k8s API)
        extra_metadata: optional dict that will be merged with the 'metadata:' section of the object
        extra_spec: optional dict that will be merged with the 'spec:' section of the object

    Returns:
        Job object. The Job is not sent for creation to API server.
    """
    obj = inject_extra(
        {
            "apiVersion": "batch/v1",
            "kind": "Job",
            "metadata": {"generateName": name_prefix, "namespace": namespace},
            "spec": {
                "backoffLimit": backoff_limit,
                "template": {
                    "spec": {
                        "containers": [
                            {
                                "name": f"{name_prefix}job",
                                "image": image,
                                "command": command,
                            }
                        ],
                        "restartPolicy": restart_policy,
                    },
                },
            },
        },
        extra_metadata,
        extra_spec,
    )
    return pykube.Job(kube_client, obj)


def create_job_and_run_to_completion(DOCS
    kube_client: pykube.HTTPClient,
    namespace: str,
    job: pykube.Job,
    timeout_sec: int = 60,
    missing_ok: bool = False,
) -> pykube.Job:
    """
    Creates Job object in k8s and blocks until it is completed.

    Args:
        kube_client: client to use to connect to the k8s cluster
        namespace: Namespace to create the Job in
        job: Job object to run and check.
        timeout_sec: timeout for the call
        missing_ok: when `True`, the function ignores that the Job doesn't yet exist in k8s API
            and waits for it to show up;
            when `False`, an [ObjectNotFound](pykube.exceptions.ObjectDoesNotExist) exception is raised.

    Returns:
        The Job object with refreshed state.

    Raises:
        TimeoutError: when timeout is reached.
        pykube.exceptions.ObjectDoesNotExist: when `missing_ok == False` and the `job` can't be found in k8s API

    """
    job.create()

    wait_for_jobs_to_complete(
        kube_client,
        [job.name],
        namespace,
        timeout_sec,
        missing_ok=missing_ok,
    )

    return job