Source code for saf.saltext.engines.analytics

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
#
"""
Salt engine module.
"""
from __future__ import annotations

import asyncio
import logging
import pathlib
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable

import aiorun
from salt.utils import yaml

from saf.manager import Manager
from saf.models import AnalyticsConfig

if TYPE_CHECKING:
    __salt__: dict[str, Callable[..., Any]]
    __opts__: dict[str, Any]
    __salt_system_encoding__: str

log = logging.getLogger(__name__)


__virtualname__ = "analytics"


[docs]def __virtual__() -> str | tuple[bool, str]: """ Return the module name, or ``(False, "Failure reason")`` to load, or not, the engine. """ # To force a module not to load return something like: # return (False, "The salt-analytics-framework engine module is not implemented yet") if get_config_dict() is None: return False, "Could not find any valid salt analytics configuration" return __virtualname__
[docs]def get_config_dict() -> dict[str, Any] | None: """ Return the configuration dictionary for salt analytics. """ config_dict: dict[str, Any] | None config_dict = __salt__["config.get"]("analytics") # pylint: disable=undefined-variable if not config_dict: config_dir = pathlib.Path(__opts__["config_dir"]) # pylint: disable=undefined-variable config_file = config_dir / "analytics" if config_file.exists(): config_dict = yaml.safe_load( config_file.read_text( encoding=__salt_system_encoding__, # pylint: disable=undefined-variable ) ) if config_dict: config_dict["salt_config"] = __opts__.copy() return config_dict
[docs]def start() -> None: """ Start the salt analytics engine. """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) config = AnalyticsConfig.model_validate(get_config_dict()) manager = Manager(config) aiorun.run( manager.run(), loop=loop, stop_on_unhandled_errors=True, shutdown_callback=manager.await_stopped(), )