Source code for saf.forward.test

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Test forwarder plugin.

The test forward plugin exists as an implementation example and also to be able
to test the salt-analytics-framework

It doesn't really do anything to the collected event.
"""
from __future__ import annotations

import asyncio
import json
import logging
import pathlib
from typing import Any
from typing import Optional
from typing import Type

from pydantic import model_validator

from saf.models import CollectedEvent
from saf.models import ForwardConfigBase
from saf.models import PipelineRunContext

log = logging.getLogger(__name__)


[docs]class TestForwardConfig(ForwardConfigBase): """ Test forwarder configuration. """ sleep: float = 0.0 path: Optional[pathlib.Path] = None message: Optional[str] = None dump_event: bool = False add_event_to_shared_cache: bool = False @model_validator(mode="before") @classmethod def _check_mutually_exclusive_parameters( cls: Type[TestForwardConfig], values: dict[str, Any] ) -> dict[str, Any]: path = values.get("path") add_event_to_shared_cache = values.get("add_event_to_shared_cache") or False if path and add_event_to_shared_cache: msg = "The 'path' and 'add_event_to_shared_cache' are mutually exclusive" raise ValueError(msg) return values
[docs]def get_config_schema() -> Type[TestForwardConfig]: """ Get the noop plugin configuration schema. """ return TestForwardConfig
[docs]async def forward( *, ctx: PipelineRunContext[TestForwardConfig], event: CollectedEvent, ) -> None: """ Method called to forward the event. """ config = ctx.config log.info("Forwarding using %s: %s", config.name, event) if config.sleep > 0: await asyncio.sleep(config.sleep) if config.add_event_to_shared_cache: log.info( "Storing collected events in `pipeline.shared_cache` under " "the 'collected_events' key." ) if "collected_events" not in ctx.shared_cache: ctx.shared_cache["collected_events"] = [] ctx.shared_cache["collected_events"].append(event) elif config.path: if config.message: if config.dump_event: dump_text = json.dumps({config.message: event.model_dump()}) else: dump_text = config.message elif config.dump_event: dump_text = event.json() else: dump_text = "" log.info("Writing into %s. Contents: %s", config.path, dump_text) with config.path.open("a", encoding="utf-8") as wfh: wfh.write(dump_text + "\n") log.info("Forwarded using %s: %s", config.name, event)