Source code for saf.models

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Salt Analytics Framework Models.
"""
from __future__ import annotations

import logging
from datetime import datetime
from datetime import timezone
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Generic
from typing import List
from typing import Mapping
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union

from pydantic import BaseModel
from pydantic import Field
from pydantic import PrivateAttr
from pydantic import validator
from pydantic.generics import GenericModel

from saf.plugins import PluginsList
from saf.utils import dt

if TYPE_CHECKING:
    from types import ModuleType

log = logging.getLogger(__name__)


[docs]class NonMutableModel(BaseModel): """ Base class for non mutable models. """ class Config: allow_mutation = False
NMC = TypeVar("NMC", bound="NonMutableConfig")
[docs]class NonMutableConfig(BaseModel): """ Base class for non-mutable configurations. """ _parent: AnalyticsConfig = PrivateAttr() @property def parent(self: NMC) -> AnalyticsConfig: """ Return the parent configuration schema. """ return self._parent class Config: allow_mutation = False underscore_attrs_are_private = True
PCMI = TypeVar("PCMI", bound="PluginConfigMixin")
[docs]class PluginConfigMixin(NonMutableConfig): """ Base class for plugin configuration schemas. """ plugin: str _name: str = PrivateAttr() @property def name(self: PCMI) -> str: """ Return the plugin name as defined in the configuration file. """ return self._name @property def loaded_plugin(self: PCMI) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ raise NotImplementedError
CCB = TypeVar("CCB", bound="CollectConfigBase")
[docs]class CollectConfigBase(PluginConfigMixin): """ Base config schema for collect plugins. """ def __new__( cls: Type[CCB], plugin: str, **kwargs: Dict[str, Any], ) -> CollectConfigBase: """ Swap the ``cls`` to instantiate if necessary. If the targeted plugin provides a ``get_config_schema`` function, then this class instance will use that class instead of the default one """ try: plugin_module = PluginsList.instance().collectors[plugin] try: get_schema_func = plugin_module.get_config_schema cls = get_schema_func() # pylint: disable=self-cls-assignment except AttributeError: log.debug( "The %r collect plugin does not provide a 'get_config_schema' function, defaulting to %s", plugin, cls, ) except KeyError: pass instance: CollectConfigBase = PluginConfigMixin.__new__(cls) return instance @property def loaded_plugin(self: CCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ return PluginsList.instance().collectors[self.plugin]
PCB = TypeVar("PCB", bound="ProcessConfigBase")
[docs]class ProcessConfigBase(PluginConfigMixin): """ Base config schema for process plugins. """ def __new__( cls: Type[PCB], plugin: str, **kwargs: Dict[str, Any], ) -> ProcessConfigBase: """ Swap the ``cls`` to instantiate if necessary. If the targeted plugin provides a ``get_config_schema`` function, then this class instance will use that class instead of the default one """ try: plugin_module = PluginsList.instance().processors[plugin] try: get_schema_func = plugin_module.get_config_schema cls = get_schema_func() # pylint: disable=self-cls-assignment except AttributeError: log.debug( "The %r process plugin does not provide a 'get_config_schema' function, defaulting to %s", plugin, cls, ) except KeyError: pass instance: ProcessConfigBase = PluginConfigMixin.__new__(cls) return instance @property def loaded_plugin(self: PCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ return PluginsList.instance().processors[self.plugin]
FCB = TypeVar("FCB", bound="ForwardConfigBase")
[docs]class ForwardConfigBase(PluginConfigMixin): """ Base config schema for forward plugins. """ def __new__( cls: Type[FCB], plugin: str, **kwargs: Dict[str, Any], ) -> ForwardConfigBase: """ Swap the ``cls`` to instantiate if necessary. If the targeted plugin provides a ``get_config_schema`` function, then this class instance will use that class instead of the default one """ try: plugin_module = PluginsList.instance().forwarders[plugin] try: get_schema_func = plugin_module.get_config_schema cls = get_schema_func() # pylint: disable=self-cls-assignment except AttributeError: log.debug( "The %r forward plugin does not provide a 'get_config_schema' function, defaulting to %s", plugin, cls, ) except KeyError: pass instance: ForwardConfigBase = PluginConfigMixin.__new__(cls) return instance @property def loaded_plugin(self: FCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ return PluginsList.instance().forwarders[self.plugin]
PC = TypeVar("PC", bound="PipelineConfig")
[docs]class PipelineConfig(NonMutableConfig): """ Base config schema for pipeline configuration. """ collect: List[str] process: List[str] = Field(default_factory=list) forward: List[str] enabled: bool = True restart: bool = True _name: str = PrivateAttr() @property def name(self: PC) -> str: """ Return the pipeline name as defined in the configuration file. """ return self._name
AC = TypeVar("AC", bound="AnalyticsConfig")
[docs]class AnalyticsConfig(BaseModel): """ Salt Analytics Framework configuration. """ collectors: Dict[str, CollectConfigBase] processors: Dict[str, ProcessConfigBase] = Field(default_factory=dict) forwarders: Dict[str, ForwardConfigBase] pipelines: Dict[str, PipelineConfig] salt_config: Dict[str, Any] @validator("pipelines", pre=True) @classmethod def _validate_pipelines( cls: Type[AC], pipelines: Dict[str, Dict[str, Any]] ) -> Dict[str, Dict[str, Any]]: for name, data in pipelines.items(): collect = data["collect"] if isinstance(collect, str): collect = [collect] process = data.get("process") forward = data["forward"] if process is None: process = [] elif isinstance(process, str): process = [process] if isinstance(forward, str): forward = [forward] pipelines[name]["collect"] = collect pipelines[name]["process"] = process pipelines[name]["forward"] = forward pipelines[name].setdefault("enabled", True) return pipelines def _init_private_attributes(self: AC) -> None: """ Set the `_parent` attribute on child schemas. """ super()._init_private_attributes() # Allow plugin configurations to access the full configuration, this instance for entry in (self.collectors, self.processors, self.forwarders, self.pipelines): if entry is None: continue for name, config in entry.items(): # type: ignore[attr-defined] config._name = name # pylint: disable=protected-access config._parent = self # pylint: disable=protected-access
[docs]class CollectedEvent(BaseModel): """ Class representing each of the collected events. """ data: Mapping[str, Any] timestamp: Optional[datetime] = Field(default_factory=dt.utcnow)
SE = TypeVar("SE", bound="SaltEvent")
[docs]class SaltEvent(NonMutableModel): """ Class representing an event from Salt's event bus. """ tag: str stamp: datetime data: Dict[str, Any] raw_data: Dict[str, Any] @staticmethod def _convert_stamp(stamp: str) -> datetime: _stamp: datetime try: _stamp = datetime.fromisoformat(stamp).replace(tzinfo=timezone.utc) except AttributeError: # pragma: no cover # Python < 3.7 _stamp = datetime.strptime(stamp, "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=timezone.utc) return _stamp @validator("stamp") @classmethod def _validate_stamp(cls: Type[SE], value: Union[str, datetime]) -> datetime: if isinstance(value, datetime): return value return SaltEvent._convert_stamp(value)
PipelineRunContextConfigType = TypeVar("PipelineRunContextConfigType", bound=NonMutableConfig)
[docs]class PipelineRunContext(GenericModel, Generic[PipelineRunContextConfigType]): """ Class representing a pipeline run context. """ config: PipelineRunContextConfigType cache: Dict[str, Any] = Field(default_factory=dict) shared_cache: Dict[str, Any] = Field(default_factory=dict) @property def pipeline_config(self) -> AnalyticsConfig: # noqa: ANN101 """ Return the analytics configuration. """ return self.config.parent @property def salt_config(self) -> Dict[str, Any]: # noqa: ANN101 """ Return the salt configuration. """ config: Dict[str, Any] = self.config.parent.salt_config return config