Source code for saf.manager
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
"""
Salt Analytics Framework Pipelines Manager.
"""
from __future__ import annotations
import asyncio
import logging
from asyncio import Task
from typing import TYPE_CHECKING
from typing import TypeVar
import aiorun
from saf.pipeline import Pipeline
if TYPE_CHECKING:
from saf.models import AnalyticsConfig
log = logging.getLogger(__name__)
MN = TypeVar("MN", bound="Manager")
[docs]class Manager:
"""
Pipelines Manager.
"""
def __init__(self: MN, config: AnalyticsConfig) -> None:
self.config = config
self.pipelines: dict[str, Pipeline] = {}
for name, pipeline_config in config.pipelines.items():
self.pipelines[name] = Pipeline(name, pipeline_config)
self.pipeline_tasks: dict[str, Task] = {} # type: ignore[type-arg]
self.loop = asyncio.get_event_loop()
[docs] async def run(self: MN) -> None:
"""
Async entry point to run the pipelines.
"""
await self.start_pipelines()
try:
while True:
try:
await asyncio.sleep(0.05)
except (KeyboardInterrupt, asyncio.CancelledError):
break
finally:
await aiorun.shutdown_waits_for(self.stop_pipelines())
[docs] async def await_stopped(self: MN) -> None:
"""
Wait until all pipelines have been stopped.
"""
await self.stop_pipelines()
[docs] async def start_pipelines(self: MN) -> None:
"""
Start the pipelines.
"""
for name in self.pipelines:
result = await self.start_pipeline(name)
if result is not None:
log.warning(result)
[docs] async def stop_pipelines(self: MN) -> None:
"""
Stop the pipelines.
"""
for name in list(self.pipeline_tasks):
result = await self.stop_pipeline(name)
if result is not None:
log.warning(result)
[docs] async def start_pipeline(self: MN, name: str) -> str | None:
"""
Start a pipeline by name.
"""
log.info("Starting pipeline %r", name)
if name not in self.pipelines:
return f"Cannot start unknown pipeline {name!r}"
pipeline = self.pipelines[name]
if pipeline.config.enabled is False:
return f"Pipeline {name!r} is disabled, skipping start."
if name in self.pipeline_tasks:
return f"Pipeline {name!r} is already running"
pipeline.__enter__()
self.pipeline_tasks[name] = self.loop.create_task(pipeline.run())
return None
[docs] async def stop_pipeline(self: MN, name: str) -> str | None:
"""
Stop a pipeline by name.
"""
log.info("Stopping pipeline %r", name)
if name not in self.pipeline_tasks:
return f"Pipeline {name!r} is not running. Not stopping it."
task = self.pipeline_tasks.pop(name)
if task.done() is not True:
task.cancel()
await task
pipeline = self.pipelines[name]
pipeline.__exit__()
return None