saf.collect package#

Submodules#

saf.collect.beacons module#

The beacons collect plugin exists as an implementation example.

It listens to Salt’s event bus for beacon events and generates analytics events based off of those.

class saf.collect.beacons.BeaconsConfig(*, plugin: str, beacons: List[str])[source]#

Bases: CollectConfigBase

Configuration schema for the beacons collect plugin.

beacons: List[str]#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'beacons': FieldInfo(annotation=List[str], required=True), 'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.collect.beacons.BeaconCollectedEvent(*, data: Mapping[str, Any], timestamp: Optional[datetime] = None, beacon: str, tag: str, stamp: datetime, raw_data: Dict[str, Any])[source]#

Bases: CollectedEvent

Beacons collected event.

beacon: str#
tag: str#
stamp: datetime#
raw_data: Dict[str, Any]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'beacon': FieldInfo(annotation=str, required=True), 'data': FieldInfo(annotation=Mapping[str, Any], required=True), 'raw_data': FieldInfo(annotation=Dict[str, Any], required=True), 'stamp': FieldInfo(annotation=datetime, required=True), 'tag': FieldInfo(annotation=str, required=True), 'timestamp': FieldInfo(annotation=Union[datetime, NoneType], required=False, default_factory=utcnow)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

saf.collect.beacons.get_config_schema() Type[BeaconsConfig][source]#

Get the beacons plugin configuration schema.

async saf.collect.beacons.collect(*, ctx: PipelineRunContext[BeaconsConfig]) AsyncIterator[BeaconCollectedEvent][source]#

Method called to collect events.

saf.collect.file module#

A file collector plugin.

class saf.collect.file.CollectedLineData[source]#

Bases: TypedDict

Collected event line data definition.

line: str#
source: Path#
class saf.collect.file.CollectedLineEvent(*, data: CollectedLineData, timestamp: Optional[datetime] = None, backfill: bool = False)[source]#

Bases: CollectedEvent

Collected line event definition.

data: CollectedLineData#
backfill: bool#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'backfill': FieldInfo(annotation=bool, required=False, default=False), 'data': FieldInfo(annotation=CollectedLineData, required=True), 'timestamp': FieldInfo(annotation=Union[datetime, NoneType], required=False, default_factory=utcnow)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class saf.collect.file.FileCollectConfig(*, plugin: str, paths: List[Path], backfill: bool = False)[source]#

Bases: CollectConfigBase

Configuration schema for the file collect plugin.

paths: List[pathlib.Path]#
backfill: bool#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'backfill': FieldInfo(annotation=bool, required=False, default=False), 'paths': FieldInfo(annotation=List[pathlib.Path], required=True), 'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

saf.collect.file.get_config_schema() Type[FileCollectConfig][source]#

Get the file collect plugin configuration schema.

async saf.collect.file.collect(*, ctx: PipelineRunContext[FileCollectConfig]) AsyncIterator[CollectedLineEvent][source]#

Method called to collect file contents.

saf.collect.test module#

Test collect plugin.

The test collect plugin exists as an implementation example and also to be able to test the salt-analytics-framework

class saf.collect.test.TestCollectConfig(*, plugin: str, interval: float[float] = 0.1, count: int[int] = 9223372036854775807)[source]#

Bases: CollectConfigBase

Test collector configuration.

interval: float#
count: int#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'count': FieldInfo(annotation=int, required=False, default=9223372036854775807, metadata=[Gt(gt=0), Lt(lt=9223372036854775807)]), 'interval': FieldInfo(annotation=float, required=False, default=0.1, metadata=[Gt(gt=0)]), 'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

saf.collect.test.get_config_schema() Type[TestCollectConfig][source]#

Get the test collect plugin configuration schema.

async saf.collect.test.collect(*, ctx: PipelineRunContext[TestCollectConfig]) AsyncIterator[CollectedEvent][source]#

Method called to collect events, in this case, generate.

saf.collect.salt_exec module#

A collect plugin that simply collects the output of a salt execution module.

class saf.collect.salt_exec.SaltExecConfig(*, plugin: str, interval: float = 5, fn: str = 'test.ping', args: List[Any] = None, kwargs: Dict[str, Any] = None)[source]#

Bases: CollectConfigBase

Configuration schema for the salt_exec collect plugin.

interval: float#
fn: str#
args: List[Any]#
kwargs: Dict[str, Any]#
model_config: ClassVar[ConfigDict] = {'frozen': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'args': FieldInfo(annotation=List[Any], required=False, default_factory=list), 'fn': FieldInfo(annotation=str, required=False, default='test.ping'), 'interval': FieldInfo(annotation=float, required=False, default=5), 'kwargs': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict), 'plugin': FieldInfo(annotation=str, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

saf.collect.salt_exec.get_config_schema() Type[SaltExecConfig][source]#

Get the salt_exec plugin configuration schema.

async saf.collect.salt_exec.collect(*, ctx: PipelineRunContext[SaltExecConfig]) AsyncIterator[CollectedEvent][source]#

Method called to collect events.