Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions sigma/processing/conditions/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,31 @@ class LogsourceCondition(RuleProcessingCondition):
the condition returns true if any of the associated rules have the required log source fields.
"""

class_uid: Optional[str] = field(default=None)
category: Optional[str] = field(default=None)
product: Optional[str] = field(default=None)
service: Optional[str] = field(default=None)

def __post_init__(self) -> None:
self.logsource = SigmaLogSource(self.category, self.product, self.service)
self.logsource = SigmaLogSource(
self.category,
self.product,
self.service,
custom_attributes={"class_uid": self.class_uid},
)

def match(
self,
rule: Union[SigmaRule, SigmaCorrelationRule],
) -> bool:
if isinstance(rule, SigmaRule):
return rule.logsource in self.logsource
res = (
str(rule.logsource.category) == str(self.logsource.category)
and str(rule.logsource.product) == str(self.logsource.product)
and str(rule.logsource.service) == str(self.logsource.service)
)
res = res and self.match_ocsf(rule=rule)
return res
elif isinstance(rule, SigmaCorrelationRule):
# Will only return true if the rules have been resolved in advance
for ref in rule.rules:
Expand All @@ -49,6 +61,14 @@ def match(
return True
return False

def match_ocsf(self, rule: SigmaRule) -> bool:
rule_ocsf = rule.custom_attributes.get("ocsf")
if rule_ocsf:
res = str(rule_ocsf["class_uid"]) == str(self.logsource.custom_attributes["class_uid"])
else:
res = True
return res


@dataclass
class RuleContainsFieldCondition(RuleDetectionItemCondition):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_processing_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_processing_condition_multiple_pipelines_set(dummy_processing_pipeline):


def test_logsource_match(sigma_rule):
assert LogsourceCondition(category="test_category").match(
assert not LogsourceCondition(category="test_category").match(
sigma_rule,
)

Expand Down
Binary file modified wheels/pysigma-0.11.23-py3-none-any.whl
Binary file not shown.