SOURCE CODE PYTEST_HELM_CHARTS.CLUSTERS DOCS

"""This module introduces classes for handling different clusters."""
import json
import logging
import shutil
import subprocess  # nosec
from abc import ABC, abstractmethod
from typing import Optional, Any

from pykube import HTTPClient, KubeConfig

logger = logging.getLogger(__name__)


class Cluster(ABC):DOCS
    """Represents an abstract cluster."""

    _kube_client: Optional[HTTPClient]
    kube_config_path: Optional[str]

    def __init__(self, kube_config_path: Optional[str] = None):
        super().__init__()
        self._kube_client = None
        self.kube_config_path = kube_config_path

    @abstractmethodDOCS
    def create(self) -> HTTPClient:
        """Creates an instance of a cluster and returns HTTPClient to connect to it."""
        raise NotImplementedError

    @abstractmethodDOCS
    def destroy(self) -> None:
        """Destroys the cluster created earlier with a call to [create](Cluster.create)."""
        raise NotImplementedError

    @propertyDOCS
    def kube_client(self) -> Optional[HTTPClient]:
        """Returns the HTTP client you can use to access Kubernetes API of the cluster under test.

        Please refer to [pykube](https://pykube.readthedocs.io/en/latest/api/pykube.html) to get docs
        for [HTTPClient](https://pykube.readthedocs.io/en/latest/api/pykube.html#pykube.http.HTTPClient).
        """
        return self._kube_client

    def kubectl(  # noqa: C901DOCS
        self,
        subcmd_string: str,
        std_input: str = "",
        output_format: str = "json",
        use_shell: bool = False,
        **kwargs: str,
    ) -> Any:
        """Execute command by running 'kubectl' binary.

        If your cluster delivers the kube.config file, run a kubectl command against the cluster
        and return the output. Otherwise, exception is raised.

        Args:
            subcmd_string (str): Command to run, like "delete pod abc"
            std_input (str): Use this to pass a manifest file directly as a string (results in 'kubectl [cmd] -f -')
            output_format (str): Option "--output" as passed to 'kubectl'. Default is 'json', make sure to change to ""
                for commands that don't return JSON.
            use_shell: Whether the 'kubectl' command should be invoked directly (when 'False') or wrapped in system
                shell ('True'). 'False' by default.
            kwargs: arbitrary dictionary of options and values that will be passed directly to 'kubectl'

        Returns:
            str: The output printed by 'kubectl', if the command succeeded (exit code was 0)

        Raises:
            subprocess.CalledProcessError: If the command exited with non-zero exit code
        """
        if not self.kube_config_path:
            raise ValueError("'kube_config_path' can't be empty to use 'kubectl'")
        bin_name = "kubectl"
        if shutil.which(bin_name) is None:
            raise ValueError(f"Can't find {bin_name} executable. Please make sure it's available in $PATH.")

        subcmds = subcmd_string.split(" ")
        if len(subcmds) == 0:
            raise ValueError("You need to give at least one kubectl subcommand in the subcmd_string argument.")

        kwargs["kubeconfig"] = self.kube_config_path
        if std_input:
            kwargs["filename"] = "-"
        if output_format:
            kwargs["output"] = output_format
        if subcmds[0].lower() in ["delete", "patch", "label", "annotate"] and "output" in kwargs:
            del kwargs["output"]

        options = {f"--{option}={value}" for option, value in kwargs.items()}

        try:
            cmd = (
                bin_name + " " + subcmd_string + " " + " ".join(options)
                if use_shell
                else [bin_name, *subcmds, *options]
            )
            result = subprocess.check_output(
                cmd, stderr=subprocess.PIPE, encoding="utf-8", input=std_input, shell=use_shell  # nosec
            )
        except subprocess.CalledProcessError as e:
            logger.error(
                f"'kubectl' call returned an error. Exit code: '{e.returncode}', stdout: '{e.stdout}',"
                f"stderr: '{e.stderr}'"
            )
            raise

        # return result from kubectl command:
        # - as parsed json by default, extracting objects list to list
        # - as plain text otherwise

        if "output" in kwargs and kwargs["output"] == "json":
            output_json = json.loads(result)
            if "items" in output_json:
                return output_json["items"]
            return output_json
        return result


class ExistingCluster(Cluster):DOCS
    """Implementation of [Cluster](Cluster) that uses kube.config file to connect to external
    existing cluster.
    """

    def __init__(self, kube_config_path: str) -> None:
        super().__init__(kube_config_path)

    def create(self) -> HTTPClient:DOCS
        kube_config = KubeConfig.from_file(self.kube_config_path)
        self._kube_client = HTTPClient(kube_config)
        return self._kube_client

    def destroy(self) -> None:DOCS
        if self._kube_client is None:
            return
        self._kube_client.session.close()
        self._kube_client = None