Source code for saf.forward.disk
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
The disk forward plugin exists as an implementation example.
It just dumps the collected events to disk
"""
from __future__ import annotations
import logging
import pathlib
from typing import Optional
from typing import Type
import aiofiles
from saf.models import CollectedEvent
from saf.models import ForwardConfigBase
from saf.models import PipelineRunContext
log = logging.getLogger(__name__)
[docs]class DiskConfig(ForwardConfigBase):
"""
Configuration schema for the disk forward plugin.
"""
path: pathlib.Path
filename: Optional[str] = None
pretty_print: bool = False
[docs]def get_config_schema() -> Type[DiskConfig]:
"""
Get the noop plugin configuration schema.
"""
return DiskConfig
[docs]async def forward(
*,
ctx: PipelineRunContext[DiskConfig],
event: CollectedEvent,
) -> None:
"""
Method called to forward the event.
"""
indent: Optional[int] = None
config = ctx.config
if config.pretty_print:
indent = 2
if not config.path.exists():
config.path.mkdir(parents=True)
if config.filename:
dest = config.path / config.filename
dest.touch()
async with aiofiles.open(dest, "a", encoding="utf-8") as wfh:
wrote = await wfh.write(f"{event.model_dump_json(indent=indent)}\n")
else:
file_count = len(list(config.path.iterdir()))
dest = config.path / f"event-dump-{file_count + 1}.json"
async with aiofiles.open(dest, "w", encoding="utf-8") as wfh:
wrote = await wfh.write(event.model_dump_json(indent=indent))
log.debug("Wrote %s bytes to %s", wrote, dest)