Source code for saf.collect.test
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Test collect plugin.
The test collect plugin exists as an implementation example and also to be able
to test the salt-analytics-framework
"""
from __future__ import annotations
import asyncio
import logging
import sys
from typing import AsyncIterator
from typing import Type
from pydantic import Field
from saf.models import CollectConfigBase
from saf.models import CollectedEvent
from saf.models import PipelineRunContext
log = logging.getLogger(__name__)
[docs]class TestCollectConfig(CollectConfigBase):
"""
Test collector configuration.
"""
interval: float = Field(0.1, gt=0)
count: int = Field(sys.maxsize, gt=0, lt=sys.maxsize)
[docs]def get_config_schema() -> Type[TestCollectConfig]:
"""
Get the test collect plugin configuration schema.
"""
return TestCollectConfig
[docs]async def collect(
*,
ctx: PipelineRunContext[TestCollectConfig],
) -> AsyncIterator[CollectedEvent]:
"""
Method called to collect events, in this case, generate.
"""
config = ctx.config
log.info("Generating test events using collector config named %r", config.name)
counter = 1
await asyncio.sleep(config.interval)
while counter <= config.count:
yield CollectedEvent(
data={
"name": config.name,
"count": counter,
}
)
await asyncio.sleep(config.interval)
counter += 1
log.info(
"Finished generating %d events for collector config named %r", config.count, config.name
)