saf.collect package#
Submodules#
saf.collect.beacons module#
The beacons collect plugin exists as an implementation example.
It listens to Salt’s event bus for beacon events and generates analytics events based off of those.
- class saf.collect.beacons.BeaconsConfig(*, plugin: str, beacons: List[str])[source]#
Bases:
CollectConfigBase
Configuration schema for the beacons collect plugin.
- model_config: ClassVar[ConfigDict] = {'frozen': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'beacons': FieldInfo(annotation=List[str], required=True), 'plugin': FieldInfo(annotation=str, required=True)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class saf.collect.beacons.BeaconCollectedEvent(*, data: Mapping[str, Any], timestamp: Optional[datetime] = None, beacon: str, tag: str, stamp: datetime, raw_data: Dict[str, Any])[source]#
Bases:
CollectedEvent
Beacons collected event.
- stamp: datetime#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'beacon': FieldInfo(annotation=str, required=True), 'data': FieldInfo(annotation=Mapping[str, Any], required=True), 'raw_data': FieldInfo(annotation=Dict[str, Any], required=True), 'stamp': FieldInfo(annotation=datetime, required=True), 'tag': FieldInfo(annotation=str, required=True), 'timestamp': FieldInfo(annotation=Union[datetime, NoneType], required=False, default_factory=utcnow)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- saf.collect.beacons.get_config_schema() Type[BeaconsConfig] [source]#
Get the beacons plugin configuration schema.
- async saf.collect.beacons.collect(*, ctx: PipelineRunContext[BeaconsConfig]) AsyncIterator[BeaconCollectedEvent] [source]#
Method called to collect events.
saf.collect.file module#
A file collector plugin.
- class saf.collect.file.CollectedLineData[source]#
Bases:
TypedDict
Collected event line data definition.
- class saf.collect.file.CollectedLineEvent(*, data: CollectedLineData, timestamp: Optional[datetime] = None, backfill: bool = False)[source]#
Bases:
CollectedEvent
Collected line event definition.
- data: CollectedLineData#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'backfill': FieldInfo(annotation=bool, required=False, default=False), 'data': FieldInfo(annotation=CollectedLineData, required=True), 'timestamp': FieldInfo(annotation=Union[datetime, NoneType], required=False, default_factory=utcnow)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class saf.collect.file.FileCollectConfig(*, plugin: str, paths: List[Path], backfill: bool = False)[source]#
Bases:
CollectConfigBase
Configuration schema for the file collect plugin.
- paths: List[pathlib.Path]#
- model_config: ClassVar[ConfigDict] = {'frozen': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'backfill': FieldInfo(annotation=bool, required=False, default=False), 'paths': FieldInfo(annotation=List[pathlib.Path], required=True), 'plugin': FieldInfo(annotation=str, required=True)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- saf.collect.file.get_config_schema() Type[FileCollectConfig] [source]#
Get the file collect plugin configuration schema.
- async saf.collect.file.collect(*, ctx: PipelineRunContext[FileCollectConfig]) AsyncIterator[CollectedLineEvent] [source]#
Method called to collect file contents.
saf.collect.test module#
Test collect plugin.
The test collect plugin exists as an implementation example and also to be able to test the salt-analytics-framework
- class saf.collect.test.TestCollectConfig(*, plugin: str, interval: float[float] = 0.1, count: int[int] = 9223372036854775807)[source]#
Bases:
CollectConfigBase
Test collector configuration.
- model_config: ClassVar[ConfigDict] = {'frozen': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'count': FieldInfo(annotation=int, required=False, default=9223372036854775807, metadata=[Gt(gt=0), Lt(lt=9223372036854775807)]), 'interval': FieldInfo(annotation=float, required=False, default=0.1, metadata=[Gt(gt=0)]), 'plugin': FieldInfo(annotation=str, required=True)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- saf.collect.test.get_config_schema() Type[TestCollectConfig] [source]#
Get the test collect plugin configuration schema.
- async saf.collect.test.collect(*, ctx: PipelineRunContext[TestCollectConfig]) AsyncIterator[CollectedEvent] [source]#
Method called to collect events, in this case, generate.
saf.collect.salt_exec module#
A collect plugin that simply collects the output of a salt execution module.
- class saf.collect.salt_exec.SaltExecConfig(*, plugin: str, interval: float = 5, fn: str = 'test.ping', args: List[Any] = None, kwargs: Dict[str, Any] = None)[source]#
Bases:
CollectConfigBase
Configuration schema for the salt_exec collect plugin.
- args: List[Any]#
- model_config: ClassVar[ConfigDict] = {'frozen': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'args': FieldInfo(annotation=List[Any], required=False, default_factory=list), 'fn': FieldInfo(annotation=str, required=False, default='test.ping'), 'interval': FieldInfo(annotation=float, required=False, default=5), 'kwargs': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict), 'plugin': FieldInfo(annotation=str, required=True)}#
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- saf.collect.salt_exec.get_config_schema() Type[SaltExecConfig] [source]#
Get the salt_exec plugin configuration schema.
- async saf.collect.salt_exec.collect(*, ctx: PipelineRunContext[SaltExecConfig]) AsyncIterator[CollectedEvent] [source]#
Method called to collect events.