saf package#

Subpackages#

Submodules#

saf.manager module#

Salt Analytics Framework Pipelines Manager.

class saf.manager.Manager(config: AnalyticsConfig)[source]#

Bases: object

Pipelines Manager.

async run() None[source]#

Async entry point to run the pipelines.

async await_stopped() None[source]#

Wait until all pipelines have been stopped.

async start_pipelines() None[source]#

Start the pipelines.

async stop_pipelines() None[source]#

Stop the pipelines.

async start_pipeline(name: str) str | None[source]#

Start a pipeline by name.

async stop_pipeline(name: str) str | None[source]#

Stop a pipeline by name.

saf.models module#

Salt Analytics Framework Models.

pydantic model saf.models.NonMutableModel[source]#

Bases: BaseModel

Base class for non mutable models.

Show JSON schema
{
   "title": "NonMutableModel",
   "description": "Base class for non mutable models.",
   "type": "object",
   "properties": {}
}

Config:
  • allow_mutation: bool = False

pydantic model saf.models.NonMutableConfig[source]#

Bases: BaseModel

Base class for non-mutable configurations.

Show JSON schema
{
   "title": "NonMutableConfig",
   "description": "Base class for non-mutable configurations.",
   "type": "object",
   "properties": {}
}

Config:
  • allow_mutation: bool = False

  • underscore_attrs_are_private: bool = True

property parent: AnalyticsConfig#

Return the parent configuration schema.

pydantic model saf.models.PluginConfigMixin[source]#

Bases: NonMutableConfig

Base class for plugin configuration schemas.

Show JSON schema
{
   "title": "PluginConfigMixin",
   "description": "Base class for plugin configuration schemas.",
   "type": "object",
   "properties": {
      "plugin": {
         "title": "Plugin",
         "type": "string"
      }
   },
   "required": [
      "plugin"
   ]
}

Config:
  • allow_mutation: bool = False

  • underscore_attrs_are_private: bool = True

Fields:
field plugin: str [Required]#
property name: str#

Return the plugin name as defined in the configuration file.

property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

pydantic model saf.models.CollectConfigBase[source]#

Bases: PluginConfigMixin

Base config schema for collect plugins.

Show JSON schema
{
   "title": "CollectConfigBase",
   "description": "Base config schema for collect plugins.",
   "type": "object",
   "properties": {
      "plugin": {
         "title": "Plugin",
         "type": "string"
      }
   },
   "required": [
      "plugin"
   ]
}

Config:
  • allow_mutation: bool = False

  • underscore_attrs_are_private: bool = True

Fields:
property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

field plugin: str [Required]#
pydantic model saf.models.ProcessConfigBase[source]#

Bases: PluginConfigMixin

Base config schema for process plugins.

Show JSON schema
{
   "title": "ProcessConfigBase",
   "description": "Base config schema for process plugins.",
   "type": "object",
   "properties": {
      "plugin": {
         "title": "Plugin",
         "type": "string"
      }
   },
   "required": [
      "plugin"
   ]
}

Config:
  • allow_mutation: bool = False

  • underscore_attrs_are_private: bool = True

Fields:
property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

field plugin: str [Required]#
pydantic model saf.models.ForwardConfigBase[source]#

Bases: PluginConfigMixin

Base config schema for forward plugins.

Show JSON schema
{
   "title": "ForwardConfigBase",
   "description": "Base config schema for forward plugins.",
   "type": "object",
   "properties": {
      "plugin": {
         "title": "Plugin",
         "type": "string"
      }
   },
   "required": [
      "plugin"
   ]
}

Config:
  • allow_mutation: bool = False

  • underscore_attrs_are_private: bool = True

Fields:
property loaded_plugin: ModuleType#

Return the plugin instance(module) for which this configuration refers to.

field plugin: str [Required]#
pydantic model saf.models.PipelineConfig[source]#

Bases: NonMutableConfig

Base config schema for pipeline configuration.

Show JSON schema
{
   "title": "PipelineConfig",
   "description": "Base config schema for pipeline configuration.",
   "type": "object",
   "properties": {
      "collect": {
         "title": "Collect",
         "type": "array",
         "items": {
            "type": "string"
         }
      },
      "process": {
         "title": "Process",
         "type": "array",
         "items": {
            "type": "string"
         }
      },
      "forward": {
         "title": "Forward",
         "type": "array",
         "items": {
            "type": "string"
         }
      },
      "enabled": {
         "title": "Enabled",
         "default": true,
         "type": "boolean"
      },
      "restart": {
         "title": "Restart",
         "default": true,
         "type": "boolean"
      }
   },
   "required": [
      "collect",
      "forward"
   ]
}

Config:
  • allow_mutation: bool = False

  • underscore_attrs_are_private: bool = True

Fields:
field collect: List[str] [Required]#
field process: List[str] [Optional]#
field forward: List[str] [Required]#
field enabled: bool = True#
field restart: bool = True#
property name: str#

Return the pipeline name as defined in the configuration file.

pydantic model saf.models.AnalyticsConfig[source]#

Bases: BaseModel

Salt Analytics Framework configuration.

Show JSON schema
{
   "title": "AnalyticsConfig",
   "description": "Salt Analytics Framework configuration.",
   "type": "object",
   "properties": {
      "collectors": {
         "title": "Collectors",
         "type": "object",
         "additionalProperties": {
            "$ref": "#/definitions/CollectConfigBase"
         }
      },
      "processors": {
         "title": "Processors",
         "type": "object",
         "additionalProperties": {
            "$ref": "#/definitions/ProcessConfigBase"
         }
      },
      "forwarders": {
         "title": "Forwarders",
         "type": "object",
         "additionalProperties": {
            "$ref": "#/definitions/ForwardConfigBase"
         }
      },
      "pipelines": {
         "title": "Pipelines",
         "type": "object",
         "additionalProperties": {
            "$ref": "#/definitions/PipelineConfig"
         }
      },
      "salt_config": {
         "title": "Salt Config",
         "type": "object"
      }
   },
   "required": [
      "collectors",
      "forwarders",
      "pipelines",
      "salt_config"
   ],
   "definitions": {
      "CollectConfigBase": {
         "title": "CollectConfigBase",
         "description": "Base config schema for collect plugins.",
         "type": "object",
         "properties": {
            "plugin": {
               "title": "Plugin",
               "type": "string"
            }
         },
         "required": [
            "plugin"
         ]
      },
      "ProcessConfigBase": {
         "title": "ProcessConfigBase",
         "description": "Base config schema for process plugins.",
         "type": "object",
         "properties": {
            "plugin": {
               "title": "Plugin",
               "type": "string"
            }
         },
         "required": [
            "plugin"
         ]
      },
      "ForwardConfigBase": {
         "title": "ForwardConfigBase",
         "description": "Base config schema for forward plugins.",
         "type": "object",
         "properties": {
            "plugin": {
               "title": "Plugin",
               "type": "string"
            }
         },
         "required": [
            "plugin"
         ]
      },
      "PipelineConfig": {
         "title": "PipelineConfig",
         "description": "Base config schema for pipeline configuration.",
         "type": "object",
         "properties": {
            "collect": {
               "title": "Collect",
               "type": "array",
               "items": {
                  "type": "string"
               }
            },
            "process": {
               "title": "Process",
               "type": "array",
               "items": {
                  "type": "string"
               }
            },
            "forward": {
               "title": "Forward",
               "type": "array",
               "items": {
                  "type": "string"
               }
            },
            "enabled": {
               "title": "Enabled",
               "default": true,
               "type": "boolean"
            },
            "restart": {
               "title": "Restart",
               "default": true,
               "type": "boolean"
            }
         },
         "required": [
            "collect",
            "forward"
         ]
      }
   }
}

Fields:
Validators:
field collectors: Dict[str, CollectConfigBase] [Required]#
field processors: Dict[str, ProcessConfigBase] [Optional]#
field forwarders: Dict[str, ForwardConfigBase] [Required]#
field pipelines: Dict[str, PipelineConfig] [Required]#
Validated by:
  • _validate_pipelines

field salt_config: Dict[str, Any] [Required]#
pydantic model saf.models.CollectedEvent[source]#

Bases: BaseModel

Class representing each of the collected events.

Show JSON schema
{
   "title": "CollectedEvent",
   "description": "Class representing each of the collected events.",
   "type": "object",
   "properties": {
      "data": {
         "title": "Data",
         "type": "object"
      },
      "timestamp": {
         "title": "Timestamp",
         "type": "string",
         "format": "date-time"
      }
   },
   "required": [
      "data"
   ]
}

Fields:
field data: Mapping[str, Any] [Required]#
field timestamp: Optional[datetime] [Optional]#
pydantic model saf.models.SaltEvent[source]#

Bases: NonMutableModel

Class representing an event from Salt’s event bus.

Show JSON schema
{
   "title": "SaltEvent",
   "description": "Class representing an event from Salt's event bus.",
   "type": "object",
   "properties": {
      "tag": {
         "title": "Tag",
         "type": "string"
      },
      "stamp": {
         "title": "Stamp",
         "type": "string",
         "format": "date-time"
      },
      "data": {
         "title": "Data",
         "type": "object"
      },
      "raw_data": {
         "title": "Raw Data",
         "type": "object"
      }
   },
   "required": [
      "tag",
      "stamp",
      "data",
      "raw_data"
   ]
}

Config:
  • allow_mutation: bool = False

Fields:
Validators:
  • _validate_stamp » stamp

field tag: str [Required]#
field stamp: datetime [Required]#
Validated by:
  • _validate_stamp

field data: Dict[str, Any] [Required]#
field raw_data: Dict[str, Any] [Required]#
pydantic model saf.models.PipelineRunContext[source]#

Bases: GenericModel, Generic[PipelineRunContextConfigType]

Class representing a pipeline run context.

Show JSON schema
{
   "title": "PipelineRunContext",
   "description": "Class representing a pipeline run context.",
   "type": "object",
   "properties": {
      "config": {
         "$ref": "#/definitions/NonMutableConfig"
      },
      "cache": {
         "title": "Cache",
         "type": "object"
      },
      "shared_cache": {
         "title": "Shared Cache",
         "type": "object"
      }
   },
   "required": [
      "config"
   ],
   "definitions": {
      "NonMutableConfig": {
         "title": "NonMutableConfig",
         "description": "Base class for non-mutable configurations.",
         "type": "object",
         "properties": {}
      }
   }
}

Fields:
field config: PipelineRunContextConfigType [Required]#
field cache: Dict[str, Any] [Optional]#
field shared_cache: Dict[str, Any] [Optional]#
property pipeline_config: AnalyticsConfig#

Return the analytics configuration.

property salt_config: Dict[str, Any]#

Return the salt configuration.

saf.pipeline module#

Salt Analytics Framework Pipeline.

class saf.pipeline.Pipeline(name: str, config: PipelineConfig)[source]#

Bases: object

Salt Analytics Pipeline.

async run() None[source]#

Run the pipeline.

__enter__() P[source]#

Enter pipeline context managert.

__exit__(*_: Any) None[source]#

Exit pipeline context managert.

saf.plugins module#

Salt Analytics Framework Plugins Support.

class saf.plugins.PluginsList[source]#

Bases: object

Plugins manager.

__repr__() str[source]#

Return a printable representation of the class instance.

static instance() PluginsList[source]#

Return the cached instance of the plugins listing.

If it doesn’t exist yet, a new instance is created and cached.

load_plugins() None[source]#

Load the available salt analytics framework plugins.

saf.plugins.catch_entry_points_exception(entry_point: str) Generator[SimpleNamespace, None, None][source]#

Context manager to catch exceptions while loading entry points.

saf.version module#