SOURCE CODE PYTEST_HELM_CHARTS.UTILS DOCS

"""Different utilities required over the whole testing lib."""
import logging
import time
from typing import Dict, Any, List, TypeVar, Callable, Type, Optional, Iterable

import pykube.exceptions
from pykube import HTTPClient

from pytest_helm_charts.clusters import Cluster
from pytest_helm_charts.errors import WaitTimeoutError, ObjectStatusError

DEFAULT_DELETE_TIMEOUT_SEC = 120

YamlDict = Dict[str, Any]

logger = logging.getLogger(__name__)

TNS = TypeVar("TNS", bound=pykube.objects.NamespacedAPIObject)
T = TypeVar("T", bound=pykube.objects.APIObject)
FactoryFunc = Callable[..., T]
MetaFactoryFunc = Callable[[pykube.HTTPClient, List[T]], FactoryFunc]


def wait_for_objects_condition(  # noqa: C901DOCS
    kube_client: HTTPClient,
    obj_type: Type[T],
    obj_names: List[str],
    objs_namespace: Optional[str],
    obj_condition_func: Callable[[T], bool],
    timeout_sec: int,
    missing_ok: bool,
    failure_condition_func: Optional[Callable[[T], bool]] = None,
) -> List[T]:
    """
    Block until all the kubernetes objects of type `obj_type` pass `obj_condition_fun` or timeout is reached.
        If `objs_namespace` is None, objects are treated as cluster-scope, otherwise as namespace-scope. If
        optional `failure_condition_func` is passed, it is executed when objects are refreshed and if it evaluates to
        `True`, it throws `ObjectStatusError` exception.

    Args:
        kube_client: client to use to connect to the k8s cluster
        obj_type: type of the objects to check; they most be derived from
            [APIObject](pykube.objects.APIObject)
        obj_names: a list of object resource names to check; all the objects must pass `obj_condition_fun`
            for this function to end with success
        objs_namespace: namespace where all the resources should be present (single namespace for all resources).
            If 'None', then it is assumed objects passed are cluster-scope
        timeout_sec: timeout for the call
        obj_condition_func: a function that gets one instance of the resource object of type `obj_type`
            and returns boolean showing whether the object meets the condition or not. The call succeeds
            only if this `obj_condition_fun` returns `True` for every object passed in `obj_names`.
        missing_ok: when `True`, the function ignores that some objects listed in the `obj_names`
            don't exist in k8s API and waits for them to show up; when `False`, an
            [ObjectNotExist](pykube.exceptions.ObjectDoesNotExist) exception is raised.
        failure_condition_func: if not None, then each monitored object is passed to this function. If it
            returns `True`, the `ObjectStatusError` is raised.

    Returns:
        The list of object resources with all the objects listed in `obj_names` included in the list.

    Raises:
        TimeoutError: when timeout is reached.
        pykube.exceptions.ObjectDoesNotExist: when `missing_ok == False` and one of the objects
            listed in `obj_names` can't be found in k8s API
        ObjectStatusError: when `failure_condition_func` is not None and any of the objects in `obj_names`
            returned `True` from this function.

    """
    if len(obj_names) == 0:
        raise ValueError("'obj_names' list can't be empty.")

    retries = 0
    all_ready = False
    matching_objs: List[T] = []
    while retries < timeout_sec:
        response = obj_type.objects(kube_client)
        if objs_namespace:
            response = response.filter(namespace=objs_namespace)
        retries += 1
        matching_objs = []
        for name in obj_names:
            try:
                obj = response.get_by_name(name)
                failed = failure_condition_func and failure_condition_func(obj)
                if failed:
                    raise ObjectStatusError(
                        f"Object's '{obj.namespace}/{obj.name}' status shows failure when waiting "
                        f"for the object's condition to pass."
                    )
                matching_objs.append(obj)
            except pykube.exceptions.ObjectDoesNotExist:
                if missing_ok:
                    pass
                else:
                    raise

        all_ready = len(matching_objs) == len(obj_names) and all(obj_condition_func(obj) for obj in matching_objs)
        if all_ready:
            break
        time.sleep(1)

    if not all_ready:
        raise TimeoutError(f"Error waiting for object of type {obj_type} to match the condition.")

    return matching_objs


def inject_extra(
    cr_dict: YamlDict,
    extra_metadata: Optional[YamlDict] = None,
    extra_spec: Optional[YamlDict] = None,
) -> YamlDict:
    if extra_metadata:
        cr_dict["metadata"].update(extra_metadata)
    if extra_spec:
        cr_dict["spec"].update(extra_spec)
    return cr_dict


def delete_and_wait_for_objects(DOCS
    kube_client: HTTPClient,
    obj_type: Type[T],
    objects_to_del: Iterable[T],
    timeout_sec: int = DEFAULT_DELETE_TIMEOUT_SEC,
) -> None:
    """
    For each object in `objects_to_delete`, make an API call to delete it, then wait until the object is gone
    from k8s API server.

    Args:
        kube_client: client to use to connect to the k8s cluster
        obj_type: type of the objects to check; they should be derived from
            [APIObject](pykube.objects.APIObject)
        objects_to_del: iterable of [APIObject](pykube.objects.APIObject) to delete. All objects must be of the same
            specific type.
        timeout_sec: timeout for all the objects in the list to be gone from API server.

    Returns: None

    """
    for kube_object in objects_to_del:
        kube_object.delete()
        obj_name = f"{kube_object.namespace}/{kube_object.name}" if kube_object.namespace else kube_object.name
        logger.debug(f"Deleted object of kind '{obj_type}' named '{obj_name}'.")

    any_exists = True
    times = 0
    while any_exists:
        if times >= timeout_sec:
            raise WaitTimeoutError(f"timeout of {timeout_sec} s exceeded while waiting for objects to be deleted")
        any_exists = False
        for o in objects_to_del:
            objects_method = getattr(obj_type, "objects")
            objects_res = (
                objects_method(kube_client, namespace=o.namespace)
                if "namespace" in o.metadata
                else objects_method(kube_client)
            )
            if objects_res.get_or_none(name=o.name):
                any_exists = True
                time.sleep(1)
                times += 1
                break


def object_factory_helper(
    kube_cluster: Cluster,
    meta_func: MetaFactoryFunc,
    obj_type: Type[T],
    timeout_sec: int = DEFAULT_DELETE_TIMEOUT_SEC,
) -> Iterable[FactoryFunc]:
    created_objects: List[T] = []

    yield meta_func(kube_cluster.kube_client, created_objects)

    delete_and_wait_for_objects(kube_cluster.kube_client, obj_type, created_objects, timeout_sec)