saf package#

Subpackages#

Submodules#

saf.manager module#

Salt Analytics Framework Pipelines Manager.

class saf.manager.Manager(config: AnalyticsConfig)[source]#

Bases: object

Pipelines Manager.

async run() None[source]#

Async entry point to run the pipelines.

async await_stopped() None[source]#

Wait until all pipelines have been stopped.

async start_pipelines() None[source]#

Start the pipelines.

async stop_pipelines() None[source]#

Stop the pipelines.

async start_pipeline(name: str) str | None[source]#

Start a pipeline by name.

async stop_pipeline(name: str) str | None[source]#

Stop a pipeline by name.

saf.models module#

Salt Analytics Framework Models.

class saf.models.NonMutableModel[source]#

Bases: BaseModel

Base class for non mutable models.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.models.NonMutableConfig[source]#

Bases: BaseModel

Base class for non-mutable configurations.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property parent: AnalyticsConfig#

Return the parent configuration schema.

model_fields: ClassVar[dict[str, FieldInfo]] = {}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(__context: Any) None#

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • __context – The context.

class saf.models.PluginConfigMixin(*, plugin: str)[source]#

Bases: NonMutableConfig

Base class for plugin configuration schemas.

plugin: str#
property name: str#

Return the plugin name as defined in the configuration file.

property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(__context: Any) None#

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • __context – The context.

class saf.models.CollectConfigBase(*, plugin: str)[source]#

Bases: PluginConfigMixin

Base config schema for collect plugins.

property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

plugin: str#
class saf.models.ProcessConfigBase(*, plugin: str)[source]#

Bases: PluginConfigMixin

Base config schema for process plugins.

property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

plugin: str#
class saf.models.ForwardConfigBase(*, plugin: str)[source]#

Bases: PluginConfigMixin

Base config schema for forward plugins.

property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

plugin: str#
class saf.models.PipelineConfig(*, collect: List[str], process: List[str] = None, forward: List[str], enabled: bool = True, restart: bool = True)[source]#

Bases: NonMutableConfig

Base config schema for pipeline configuration.

collect: List[str]#
process: List[str]#
forward: List[str]#
enabled: bool#
restart: bool#
property name: str#

Return the pipeline name as defined in the configuration file.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'collect': FieldInfo(annotation=List[str], required=True), 'enabled': FieldInfo(annotation=bool, required=False, default=True), 'forward': FieldInfo(annotation=List[str], required=True), 'process': FieldInfo(annotation=List[str], required=False, default_factory=list), 'restart': FieldInfo(annotation=bool, required=False, default=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(__context: Any) None#

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • __context – The context.

class saf.models.AnalyticsConfig(*, collectors: Dict[str, CollectConfigBase[CollectConfigBase]], processors: Dict[str, ProcessConfigBase[ProcessConfigBase]] = None, forwarders: Dict[str, ForwardConfigBase[ForwardConfigBase]], pipelines: Dict[str, PipelineConfig], salt_config: Dict[str, Any])[source]#

Bases: BaseModel

Salt Analytics Framework configuration.

collectors: Dict[str, CollectConfig]#
processors: Dict[str, ProcessConfig]#
forwarders: Dict[str, ForwardConfig]#
pipelines: Dict[str, PipelineConfig]#
salt_config: Dict[str, Any]#
model_post_init(_AnalyticsConfig__context: Any) None[source]#

Set the _parent attribute on child schemas.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'collectors': FieldInfo(annotation=Dict[str, Annotated[saf.models.CollectConfigBase, PlainValidator(func=<function _instantiate_collector>)]], required=True), 'forwarders': FieldInfo(annotation=Dict[str, Annotated[saf.models.ForwardConfigBase, PlainValidator(func=<function _instantiate_forwarder>)]], required=True), 'pipelines': FieldInfo(annotation=Dict[str, saf.models.PipelineConfig], required=True), 'processors': FieldInfo(annotation=Dict[str, Annotated[saf.models.ProcessConfigBase, PlainValidator(func=<function _instantiate_processor>)]], required=False, default_factory=dict), 'salt_config': FieldInfo(annotation=Dict[str, Any], required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.models.CollectedEvent(*, data: Mapping[str, Any], timestamp: Optional[datetime] = None)[source]#

Bases: BaseModel

Class representing each of the collected events.

data: Mapping[str, Any]#
timestamp: Optional[datetime]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'data': FieldInfo(annotation=Mapping[str, Any], required=True), 'timestamp': FieldInfo(annotation=Union[datetime, NoneType], required=False, default_factory=utcnow)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.models.SaltEvent(*, tag: str, stamp: datetime, data: Dict[str, Any], raw_data: Dict[str, Any])[source]#

Bases: NonMutableModel

Class representing an event from Salt’s event bus.

tag: str#
stamp: datetime#
data: Dict[str, Any]#
raw_data: Dict[str, Any]#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'data': FieldInfo(annotation=Dict[str, Any], required=True), 'raw_data': FieldInfo(annotation=Dict[str, Any], required=True), 'stamp': FieldInfo(annotation=datetime, required=True), 'tag': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.models.RuntimeAnalyticsInfo(*, version: str)[source]#

Bases: NonMutableModel

Salt analytics runtime information.

version: str#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'version': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.models.RuntimeSaltInfo(*, id: str, role: str, version: str, version_info: Tuple[int, ...])[source]#

Bases: NonMutableModel

Salt runtime information.

id: str#
role: str#
version: str#
version_info: Tuple[int, ...]#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'id': FieldInfo(annotation=str, required=True), 'role': FieldInfo(annotation=str, required=True), 'version': FieldInfo(annotation=str, required=True), 'version_info': FieldInfo(annotation=Tuple[int, ...], required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.models.RuntimeInfo(*, salt: RuntimeSaltInfo, analytics: RuntimeAnalyticsInfo)[source]#

Bases: NonMutableModel

Salt analytics pipelines runtime information.

salt: RuntimeSaltInfo#
analytics: RuntimeAnalyticsInfo#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'analytics': FieldInfo(annotation=RuntimeAnalyticsInfo, required=True), 'salt': FieldInfo(annotation=RuntimeSaltInfo, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.models.PipelineRunContext(*, config: PipelineRunContextConfigType, cache: Dict[str, Any] = None, shared_cache: Dict[str, Any] = None)[source]#

Bases: NonMutableModel, Generic[PipelineRunContextConfigType]

Class representing a pipeline run context.

config: PipelineRunContextConfigType#
cache: Dict[str, Any]#
shared_cache: Dict[str, Any]#
property pipeline_config: AnalyticsConfig#

Return the analytics configuration.

property salt_config: Dict[str, Any]#

Return the salt configuration.

property info: RuntimeInfo#

Return the pipeline runtime information.

model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'cache': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict), 'config': FieldInfo(annotation=~PipelineRunContextConfigType, required=True), 'shared_cache': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(__context: Any) None#

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • __context – The context.

saf.pipeline module#

Salt Analytics Framework Pipeline.

class saf.pipeline.Pipeline(name: str, config: PipelineConfig)[source]#

Bases: object

Salt Analytics Pipeline.

async run() None[source]#

Run the pipeline.

__enter__() P[source]#

Enter pipeline context managert.

__exit__(*_: Any) None[source]#

Exit pipeline context managert.

saf.plugins module#

Salt Analytics Framework Plugins Support.

class saf.plugins.PluginsList[source]#

Bases: object

Plugins manager.

__repr__() str[source]#

Return a printable representation of the class instance.

static instance() PluginsList[source]#

Return the cached instance of the plugins listing.

If it doesn’t exist yet, a new instance is created and cached.

load_plugins() None[source]#

Load the available salt analytics framework plugins.

saf.plugins.catch_entry_points_exception(entry_point: str) Generator[SimpleNamespace, None, None][source]#

Context manager to catch exceptions while loading entry points.

saf.version module#