Source code for saf.forward.disk

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
The disk forward plugin exists as an implementation example.

It just dumps the collected events to disk
"""
from __future__ import annotations

import logging
import pathlib
from typing import Optional
from typing import Type

import aiofiles

from saf.models import CollectedEvent
from saf.models import ForwardConfigBase
from saf.models import PipelineRunContext

log = logging.getLogger(__name__)


[docs]class DiskConfig(ForwardConfigBase): """ Configuration schema for the disk forward plugin. """ path: pathlib.Path filename: Optional[str] = None pretty_print: bool = False
[docs]def get_config_schema() -> Type[DiskConfig]: """ Get the noop plugin configuration schema. """ return DiskConfig
[docs]async def forward( *, ctx: PipelineRunContext[DiskConfig], event: CollectedEvent, ) -> None: """ Method called to forward the event. """ indent: Optional[int] = None config = ctx.config if config.pretty_print: indent = 2 if not config.path.exists(): config.path.mkdir(parents=True) if config.filename: dest = config.path / config.filename dest.touch() async with aiofiles.open(dest, "a", encoding="utf-8") as wfh: wrote = await wfh.write(f"{event.model_dump_json(indent=indent)}\n") else: file_count = len(list(config.path.iterdir())) dest = config.path / f"event-dump-{file_count + 1}.json" async with aiofiles.open(dest, "w", encoding="utf-8") as wfh: wrote = await wfh.write(event.model_dump_json(indent=indent)) log.debug("Wrote %s bytes to %s", wrote, dest)