Source code for saf.collect.salt_exec

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
A collect plugin that simply collects the output of a salt execution module.
"""
from __future__ import annotations

import asyncio
import logging
from typing import Any
from typing import AsyncIterator
from typing import Dict
from typing import List
from typing import Type

import salt.loader
from pydantic import Field

from saf.models import CollectConfigBase
from saf.models import CollectedEvent
from saf.models import PipelineRunContext

log = logging.getLogger(__name__)


[docs]class SaltExecConfig(CollectConfigBase): """ Configuration schema for the salt_exec collect plugin. """ interval: float = 5 fn: str = "test.ping" args: List[Any] = Field(default_factory=list) kwargs: Dict[str, Any] = Field(default_factory=dict)
[docs]def get_config_schema() -> Type[SaltExecConfig]: """ Get the salt_exec plugin configuration schema. """ return SaltExecConfig
[docs]async def collect(*, ctx: PipelineRunContext[SaltExecConfig]) -> AsyncIterator[CollectedEvent]: """ Method called to collect events. """ config = ctx.config # Load salt functions and pick out the desired one loaded_funcs = salt.loader.minion_mods(config.parent.salt_config) loaded_fn = loaded_funcs[config.fn] while True: ret = loaded_fn(*config.args, **config.kwargs) event = CollectedEvent(data={"ret": ret}) log.debug("CollectedEvent: %s", event) yield event await asyncio.sleep(config.interval)