Source code for saf.collect.file

# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
A file collector plugin.
"""
from __future__ import annotations

import asyncio
import glob
import logging
import os
import pathlib
import sys
from typing import AsyncIterator
from typing import List
from typing import Type

import aiofiles
import aiostream.stream

from saf.models import CollectConfigBase
from saf.models import CollectedEvent
from saf.models import PipelineRunContext

if sys.version_info < (3, 12):
    from typing_extensions import TypedDict
else:
    from typing import TypedDict  # type: ignore[attr-defined,no-redef]

log = logging.getLogger(__name__)


[docs]class CollectedLineData(TypedDict): """ Collected event line data definition. """ line: str source: pathlib.Path
[docs]class CollectedLineEvent(CollectedEvent): """ Collected line event definition. """ data: CollectedLineData backfill: bool = False
[docs]class FileCollectConfig(CollectConfigBase): """ Configuration schema for the file collect plugin. """ paths: List[pathlib.Path] # If true, starts at the beginning of a file, else at the end backfill: bool = False
[docs]def get_config_schema() -> Type[FileCollectConfig]: """ Get the file collect plugin configuration schema. """ return FileCollectConfig
async def _process_file( *, path: pathlib.Path, backfill: bool = False ) -> AsyncIterator[CollectedLineEvent]: """ Process the given file and `yield` an even per read line. """ async with aiofiles.open(path) as rfh: if backfill: async for line in rfh: yield CollectedLineEvent( data=CollectedLineData(line=line, source=path), backfill=True ) else: await rfh.seek(0, os.SEEK_END) while True: line = await rfh.readline() if not line: await asyncio.sleep(0.5) continue yield CollectedLineEvent(data=CollectedLineData(line=line, source=path))
[docs]async def collect( *, ctx: PipelineRunContext[FileCollectConfig] ) -> AsyncIterator[CollectedLineEvent]: """ Method called to collect file contents. """ config = ctx.config streams = [] for entry in config.paths: glob_matches = glob.glob(str(entry)) if not glob_matches: log.error( "The glob matching for provided path '%s' did not return any results. Ignoring.", entry, ) continue for match in glob_matches: path = pathlib.Path(match) if not path.is_file(): log.error( "The provided path '%s' does not exist or is not a file. Ignoring.", path, ) continue streams.append(_process_file(path=path, backfill=config.backfill)) combined = aiostream.stream.merge(*streams) async with combined.stream() as stream: async for event in stream: yield event