Source code for saf.collect.beacons

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
The beacons collect plugin exists as an implementation example.

It listens to Salt's event bus for beacon events and generates
analytics events based off of those.
"""
from __future__ import annotations

import logging
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import AsyncIterator
from typing import Dict
from typing import List
from typing import Type
from typing import TypeVar
from typing import Union

from pydantic import field_validator

from saf.models import CollectConfigBase
from saf.models import CollectedEvent
from saf.models import PipelineRunContext
from saf.models import SaltEvent
from saf.utils import eventbus

log = logging.getLogger(__name__)

BCE = TypeVar("BCE", bound="BeaconCollectedEvent")


[docs]class BeaconsConfig(CollectConfigBase): """ Configuration schema for the beacons collect plugin. """ beacons: List[str]
[docs]class BeaconCollectedEvent(CollectedEvent): """ Beacons collected event. """ beacon: str tag: str stamp: datetime raw_data: Dict[str, Any] @staticmethod def _convert_stamp(stamp: str) -> datetime: _stamp: datetime try: _stamp = datetime.fromisoformat(stamp).replace(tzinfo=timezone.utc) except AttributeError: # pragma: no cover # Python < 3.7 _stamp = datetime.strptime(stamp, "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=timezone.utc) return _stamp @field_validator("stamp") @classmethod def _validate_stamp(cls: Type[BCE], value: Union[str, datetime]) -> datetime: if isinstance(value, datetime): return value return BeaconCollectedEvent._convert_stamp(value)
[docs]def get_config_schema() -> Type[BeaconsConfig]: """ Get the beacons plugin configuration schema. """ return BeaconsConfig
[docs]async def collect(*, ctx: PipelineRunContext[BeaconsConfig]) -> AsyncIterator[BeaconCollectedEvent]: """ Method called to collect events. """ config = ctx.config salt_event: SaltEvent tags = {f"salt/beacon/*/{beacon}/*" for beacon in config.beacons} log.info("The beacons collect plugin is configured to listen to tags: %s", tags) async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=tags): yield BeaconCollectedEvent( beacon=salt_event.raw_data["beacon_name"], tag=salt_event.tag, stamp=salt_event.stamp, data=salt_event.data, raw_data=salt_event.raw_data, )