SOURCE CODE PYTEST_HELM_CHARTS.GIANTSWARM_APP_PLATFORM.FIXTURES DOCS

import logging
from typing import List, Iterable

import pytest
from deprecated import deprecated
from pykube import ConfigMap

from pytest_helm_charts.k8s.fixtures import NamespaceFactoryFunc
from pytest_helm_charts.clusters import Cluster
from pytest_helm_charts.giantswarm_app_platform.app import (
    AppFactoryFunc,
    app_factory_func,
    ConfiguredApp,
    AppCR,
)
from pytest_helm_charts.giantswarm_app_platform.app_catalog import (
    AppCatalogFactoryFunc,
    AppCatalogCR,
    app_catalog_factory_func,
)
from pytest_helm_charts.giantswarm_app_platform.catalog import (
    CatalogFactoryFunc,
    CatalogCR,
    catalog_factory_func,
)
from pytest_helm_charts.utils import object_factory_helper, delete_and_wait_for_objects

logger = logging.getLogger(__name__)


@deprecated(version="0.5.3", reason="Please use `catalog_factory` fixture instead.")DOCS
@pytest.fixture(scope="module")
def app_catalog_factory(kube_cluster: Cluster) -> Iterable[AppCatalogFactoryFunc]:
    """
    [Obsolete]
    Please use
    [_catalog_factory](pytest_helm_charts.giantswarm_app_platform.catalog._catalog_factory) instead.

    Return a factory object, that can be used to configure new AppCatalog CRs
    for the 'app-operator' running in the cluster. Fixture's scope is 'module'.
    """
    yield from object_factory_helper(kube_cluster, app_catalog_factory_func, AppCatalogCR)


@pytest.fixture(scope="function")DOCS
def catalog_factory_function_scope(
    kube_cluster: Cluster, namespace_factory: NamespaceFactoryFunc
) -> Iterable[CatalogFactoryFunc]:
    """Return a factory object, that can be used to configure new Catalog CRs
    for the 'app-operator' running in the cluster. Fixture's scope is 'function'."""
    yield from _catalog_factory_impl(kube_cluster, namespace_factory)


@pytest.fixture(scope="module")DOCS
def catalog_factory(kube_cluster: Cluster, namespace_factory: NamespaceFactoryFunc) -> Iterable[CatalogFactoryFunc]:
    """Return a factory object, that can be used to configure new Catalog CRs
    for the 'app-operator' running in the cluster. Fixture's scope is 'module'."""
    yield from _catalog_factory_impl(kube_cluster, namespace_factory)


def _catalog_factory_impl(
    kube_cluster: Cluster, namespace_factory: NamespaceFactoryFunc
) -> Iterable[CatalogFactoryFunc]:
    created_objects: List[CatalogCR] = []

    yield catalog_factory_func(kube_cluster.kube_client, created_objects, namespace_factory)

    delete_and_wait_for_objects(kube_cluster.kube_client, CatalogCR, created_objects)


@pytest.fixture(scope="module")DOCS
def app_factory(
    kube_cluster: Cluster, catalog_factory: CatalogFactoryFunc, namespace_factory: NamespaceFactoryFunc
) -> Iterable[AppFactoryFunc]:
    """Returns a factory function which can be used to install an app using App CR. Fixture's scope is 'module'."""
    yield from _app_factory_impl(kube_cluster, catalog_factory, namespace_factory)


@pytest.fixture(scope="function")DOCS
def app_factory_function_scope(
    kube_cluster: Cluster, catalog_factory: CatalogFactoryFunc, namespace_factory: NamespaceFactoryFunc
) -> Iterable[AppFactoryFunc]:
    """Returns a factory function which can be used to install an app using App CR. Fixture's scope is 'module'."""
    yield from _app_factory_impl(kube_cluster, catalog_factory, namespace_factory)


def _app_factory_impl(
    kube_cluster: Cluster, catalog_factory: CatalogFactoryFunc, namespace_factory: NamespaceFactoryFunc
) -> Iterable[AppFactoryFunc]:
    """Returns a factory function which can be used to install an app using App CR."""

    created_apps: List[ConfiguredApp] = []

    yield app_factory_func(kube_cluster.kube_client, catalog_factory, namespace_factory, created_apps)

    apps_to_delete = [a.app for a in created_apps]
    delete_and_wait_for_objects(kube_cluster.kube_client, AppCR, apps_to_delete)
    cms_to_delete = [a.app_cm for a in created_apps if a.app_cm is not None]
    delete_and_wait_for_objects(kube_cluster.kube_client, ConfigMap, cms_to_delete)