Source code for saf.utils.eventbus

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Wrapper functions around Salt's eventbus.
"""
from __future__ import annotations

import asyncio
import copy
import fnmatch
import logging
import queue
from typing import TYPE_CHECKING
from typing import Any
from typing import AsyncIterator

import salt.utils.event

from saf.models import SaltEvent

if TYPE_CHECKING:
    from queue import Queue

log = logging.getLogger(__name__)


def _construct_event(event_data: dict[str, Any]) -> SaltEvent | None:
    """
    Construct a :py:class:`~saf.models.SaltEvent` from a salt event payload.
    """
    salt_event = None
    try:
        event_raw_data = copy.deepcopy(event_data)
        # Salt's event data has some "private" keys, for example, "_stamp".
        # We'll just store a full_data attribute and clean up the regular data of these keys
        for key in list(event_data):
            if key.startswith("_"):
                event_data.pop(key)
        salt_event = SaltEvent(
            tag=event_data["tag"],
            stamp=event_raw_data["_stamp"],
            data=event_data["data"],
            raw_data=event_raw_data,
        )
        log.debug("Constructed SaltEvent: %s", salt_event)
    except Exception:
        log.exception("Failed to construct a SaltEvent")
    return salt_event


def _process_events(
    opts: dict[str, Any],
    events_queue: Queue[SaltEvent],
    tags: set[str],
) -> None:
    """
    Collect events from Salt's event bus.

    This function is meant to run on a separate threads until Salt stops using tornado or
    it's safe to use asyncio as the asynchronous loop.
    """
    opts["file_client"] = "local"
    with salt.utils.event.get_event(
        opts["__role"],
        sock_dir=opts["sock_dir"],
        transport=opts["transport"],
        opts=opts,
        listen=True,
    ) as eventbus:
        for event in eventbus.iter_events(full=True, auto_reconnect=True):
            if not event:
                continue
            event_tag = event["tag"]
            event_data = event["data"]
            if event_tag == "__beacons_return":
                # Special case __beacons_return event since it's basically a container
                # for all of the Salt's beacon events on each beacons collect iteration

                for beacon_event_data in event_data["beacons"]:
                    for tag in tags:
                        try:
                            if fnmatch.fnmatch(beacon_event_data["tag"], tag):
                                if "_stamp" not in beacon_event_data:
                                    # Wrapped beacon data usually lack the _stamp key/value pair. Use parent's.
                                    beacon_event_data["_stamp"] = event_data["_stamp"]
                                # Unwrap the nested data key/value pair if needed
                                if "data" in beacon_event_data["data"]:
                                    beacon_event_data["data"] = beacon_event_data["data"].pop(
                                        "data"
                                    )
                                log.debug(
                                    "Matching Beacon event; TAG: %r DATA: %r",
                                    beacon_event_data["tag"],
                                    beacon_event_data["data"],
                                )
                                salt_event = _construct_event(beacon_event_data)
                                if salt_event:
                                    events_queue.put_nowait(salt_event)
                                # We found a matching tag, stop iterating tags
                                break
                        except Exception:
                            log.exception(
                                "Ran into an error while processing beacon events",
                            )
                # No additional processing required, process to next event from the event bus
                continue

            # Non special cased salt event tags
            for tag in tags:
                if fnmatch.fnmatch(event_tag, tag):
                    log.debug("Matching event; TAG: %r DATA: %r", event_tag, event_data)
                    salt_event = _construct_event(event_data)
                    if salt_event:
                        events_queue.put_nowait(salt_event)
                    # We found a matching tag, stop iterating tags
                    break


async def _start_event_listener(
    *,
    opts: dict[str, Any],
    events_queue: Queue[SaltEvent],
    tags: set[str],
) -> None:
    # We don't want to mix asyncio and tornado loops,
    # so, we defer the salt event listening to a separate
    # thread.
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(
        None,
        _process_events,
        opts,
        events_queue,
        tags,
    )


[docs]async def iter_events(*, tags: set[str], opts: dict[str, Any]) -> AsyncIterator[SaltEvent]: """ Method called to collect events. """ loop = asyncio.get_event_loop() events_queue: Queue[SaltEvent] = queue.Queue() process_events_task = loop.create_task( _start_event_listener(opts=opts, events_queue=events_queue, tags=tags) ) try: while True: try: event = events_queue.get_nowait() yield event except queue.Empty: await asyncio.sleep(0.1) finally: process_events_task.cancel()