diff --git a/.gitignore b/.gitignore index 45c7508a..88a1e6c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,28 +1,37 @@ -.DS_Store -**/venv/* -**/venv3/* +# build __pycache__ **/dist/* **/build/* -**/pkg_build/* -**/.ruff_cache/* -*.log -*.log.* -*.csv -.tox -rfalerts_*.json -rfplaybookalerts_*.json dist/ -.vscode *.pyc *.egg-info *.tar.gz *.tgz +site/ + +# client +.DS_Store +.vscode +.idea/ + + +# tets and coverage +**/.ruff_cache/* +*.log +*.log.* +*.csv result.xml .coverage tests/stix2/tests tests/output/* -virt/ +alerts/ +attachments/ +rules/ +risklists/ +bundles/ + + +# docs docs/examples/classic_alerts/alerts docs/examples/playbook_alerts/alerts docs/examples/analyst_notes/attachments @@ -31,11 +40,4 @@ docs/examples/enrich/enrich docs/examples/risklists/risklists docs/examples/stix2/bundles -alerts/ -attachments/ -rules/ -risklists/ -bundles/ - -site/ diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 984f4997..4156d32a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v2.8.0 - 2026-05-22 + +- Added support for Links API via the `LinksMgr`. + ## v2.7.0 - 2026-05-01 - Added support for Threat Map and Malware Map via the `ThreatMapMgr`. diff --git a/docs/api/links/errors.md b/docs/api/links/errors.md new file mode 100644 index 00000000..4db337f4 --- /dev/null +++ b/docs/api/links/errors.md @@ -0,0 +1 @@ +::: psengine.links.errors \ No newline at end of file diff --git a/docs/api/links/links.md b/docs/api/links/links.md new file mode 100644 index 00000000..9d39fa9c --- /dev/null +++ b/docs/api/links/links.md @@ -0,0 +1 @@ +::: psengine.links.links diff --git a/docs/api/links/links_mgr.md b/docs/api/links/links_mgr.md new file mode 100644 index 00000000..b287756f --- /dev/null +++ b/docs/api/links/links_mgr.md @@ -0,0 +1,8 @@ +::: psengine.links.links_mgr.LinksMgr + options: + members: + - __init__ + - list_sections + - list_events + - list_entity_types + - search diff --git a/docs/api/links/models.md b/docs/api/links/models.md new file mode 100644 index 00000000..ba4d9a55 --- /dev/null +++ b/docs/api/links/models.md @@ -0,0 +1 @@ +::: psengine.links.models diff --git a/docs/examples/links/__init__.py b/docs/examples/links/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/docs/examples/links/example_1.py b/docs/examples/links/example_1.py new file mode 100644 index 00000000..45c4ac06 --- /dev/null +++ b/docs/examples/links/example_1.py @@ -0,0 +1,16 @@ +from psengine.links import LinksMgr + +mgr = LinksMgr() + +results = mgr.search(entities='QCwdoU') + +for result in results: + if result.error: + print(f'Failed: {result.error.message}') + continue + + entity = result.entity + print(f'Entity: {entity.name}') + + for link in result.links[:5]: + print(f' -> {link.name} source: {link.source}') diff --git a/docs/examples/links/example_2.py b/docs/examples/links/example_2.py new file mode 100644 index 00000000..101afc2f --- /dev/null +++ b/docs/examples/links/example_2.py @@ -0,0 +1,20 @@ +from psengine.links import LinksMgr + +mgr = LinksMgr() + +results = mgr.search( + entities='I60vfZ', + sources='technical', + entity_types='type:Malware', + timeframe='-90d', + search_scope='small', + per_entity_type=50, +) + +for result in results: + if result.error: + print(f'Failed: {result.error.message}') + continue + + for link in result.links: + print(f'{link.name} ({link.type_})') diff --git a/docs/examples/links/example_3.py b/docs/examples/links/example_3.py new file mode 100644 index 00000000..09390f50 --- /dev/null +++ b/docs/examples/links/example_3.py @@ -0,0 +1,32 @@ +from psengine.links import LinksMgr + +mgr = LinksMgr() + +results = mgr.search(entities=['QCwdoU']) + +for result in results: + if result.error: + print(f'Failed: {result.error.message}') + continue + + print(f'Entity: {result.entity.name}') + + print('\nIOCs grouped by type:') + for ioc_type, iocs in result.iocs().items(): + print(f' {ioc_type}: {len(iocs)}') + for ioc in iocs[:3]: + print( + f' - {ioc.name} score:{ioc.risk_score}' + ) + + print('\nTTPs:') + for ttp in result.ttps()[:5]: + print(f' - {ttp.name} ({ttp.display_name})') + + print('\nMalwares:') + for malware in result.malwares()[:5]: + print(f' - {malware.name}') + + print('\nThreat actors:') + for threat_actor in result.threat_actors()[:5]: + print(f' - {threat_actor.name}') diff --git a/docs/modules/links.md b/docs/modules/links.md new file mode 100644 index 00000000..dbc687ca --- /dev/null +++ b/docs/modules/links.md @@ -0,0 +1,96 @@ +## Introduction + +The `LinksMgr` class of the `links` module allows you to search for technically validated relationships between threat intelligence entities in the Recorded Future Intelligence Cloud — connections established through sandbox analysis, infrastructure analysis, network traffic analysis, and Insikt Group research. + +See the [**API Reference**](../api/links/links_mgr.md) for internal details of the module. + +## Notes + +The `search` method expects Recorded Future entity IDs (for example, `QCwdoU`), not entity names. If you only have a name, use the `entity_match` module first to resolve the ID. + +The response is batched: you get one result per input entity. A specific entity can fail while others succeed, so always check `result.error` before iterating over `result.links`. + +For filters such as sections, events, and entity types, use the metadata methods (`list_sections`, `list_events`, and `list_entity_types`) to retrieve valid IDs before calling `search`. + +## Examples + +{! modules/_includes/examples_warning.md !} + +#### 1: Search links for an entity and handle per-entity errors + +In this example, we call `search` with a single entity ID. For each result, we first check `result.error`. If there is no error, we print the source entity and the first 5 linked entities returned by the API. + +```python +--8<-- "docs/examples/links/example_1.py" +``` + +The output will be: + +``` +Entity: Lazarus Group + -> CVE-2022-47966 source: insikt + -> 24988feb1b38f400069acec4514aa4deea3f6ca8ceb5296f54926e2b22af1e5a source: insikt + -> 36db27f5eb3343cfc72d261d78da44957a49cb6731acb50a96ea5694f4d616c5 source: insikt + -> ffec6e6d4e314f64f5d31c62024252abde7f77acdd63991cb16923ff17828885 source: insikt + -> 3e5fd9acdab438ffc8b2cce48c91679d3f980d08f9dea47d5e1039d352cd64fb source: insikt +``` + + +#### 2: Filter link results and apply limits + +In this example, we pass filter and limit arguments directly to `search` (for example `sources`, `entity_types`, `timeframe`, `search_scope`, and `per_entity_type`) to narrow results to technical malware links seen in the last 90 days and cap result size per entity type. + +```python +--8<-- "docs/examples/links/example_2.py" +``` + +#### 3: Extract IOCs, Threat Actors and Malwares from links + +Each item returned by `LinksMgr.search()` is an `EntityLinks` object. After checking `result.error`, you can use helper methods to extract common subsets from `result.links`: + +- `result.iocs()`: returns linked IOCs grouped by IOC type (`type:InternetDomainName`, `type:CyberVulnerability`, `type:IpAddress`, `type:Hash`, `type:Url`). Each IOC item includes `id`, `type`, `name`, `risk_score`, and `source`. +- `result.ttps()`: returns linked MITRE ATT&CK techniques (`type:MitreAttackIdentifier`). Each item includes `id`, `type`, `name`, `display_name`, and `source`. +- `result.malwares()`: returns linked malware entities (`type:Malware`) with `id`, `type`, `name`, and `source`. +- `result.threat_actors()`: returns linked organizations (`type:Organization`) that are marked as threat actors through the `threat_actor` attribute. + +```python +--8<-- "docs/examples/links/example_3.py" +``` + +This will output: + +``` +Entity: Lazarus Group + +IOCs grouped by type: + type:CyberVulnerability: 5 + - CVE-2022-47966 score:99 + - CVE-2019-3396 score:89 + - CVE-2022-0609 score:79 + type:Hash: 1273 + - c8706a586afa880fbf23ce662b7fc2925fd0384b8cde0a40f3c00a182c5a3d06 score:89 + - 9ab05d771d6face27502052af8e3a945ad66e3c6726a2dda637fa12641e2bca2 score:89 + - 084f904249f8655e925b4b330426df6b979cd3f630f7765dac405f2144755738 score:89 + type:InternetDomainName: 144 + - calendly.live score:74 + - pypilibrary.com score:72 + - test-wolf.com score:69 + type:IpAddress: 29 + - 172.96.137.224 score:55 + - 103.231.75.101 score:38 + - 23.27.140.49 score:32 + +TTPs: + - T1082 (T1082 (System Information Discovery)) + - T1071 (T1071 (Application Layer Protocol)) + - TA0010 (TA0010 (Exfiltration)) + - T1027 (T1027 (Obfuscated Files or Information)) + - T1497.001 (T1497.001 (Virtualization/Sandbox Evasion: System Checks)) + +Malwares: + - CIPHERROCKET + - QuiteRAT + - PondRAT + - Trickbot + - NukeSped +``` diff --git a/mkdocs.yml b/mkdocs.yml index 7d2f6f74..80fa5e9c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -101,6 +101,7 @@ nav: - Entity Match: modules/entity_match.md - Fusion: modules/fusion.md - Identity: modules/identity.md + - Links: modules/links.md - Malware Intelligence: modules/malware_intel.md - Playbook Alerts: modules/playbook_alerts.md - Risk History: modules/risk_history.md @@ -191,6 +192,11 @@ nav: - ADT: api/identity/identity.md - Constants: api/identity/constants.md - Errors: api/identity/errors.md + - Links: + - Manager: api/links/links_mgr.md + - ADT: api/links/links.md + - Models: api/links/models.md + - Errors: api/links/errors.md - Logger: - Logger: api/logger/rf_logger.md - Constants: api/logger/constants.md diff --git a/psengine/analyst_notes/note_mgr.py b/psengine/analyst_notes/note_mgr.py index ee3c791d..26b93144 100644 --- a/psengine/analyst_notes/note_mgr.py +++ b/psengine/analyst_notes/note_mgr.py @@ -53,12 +53,11 @@ class AnalystNoteMgr: """Manages requests for Recorded Future analyst notes.""" - def __init__(self, rf_token: str = None): - """Initializes the `AnalystNoteMgr` object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the `AnalystNoteMgr` object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/psengine/asi/asi_mgr.py b/psengine/asi/asi_mgr.py index d706d018..3ca4f118 100644 --- a/psengine/asi/asi_mgr.py +++ b/psengine/asi/asi_mgr.py @@ -95,12 +95,11 @@ def _validate_exposure_score_range(v: tuple[int, int]) -> tuple[int, int]: class AttackSurfaceMgr: """Manages requests for Recorded Future SecurityTrails (ASI) API.""" - def __init__(self, api_token: str = None): - """Initializes the `AttackSurfaceMgr` object. - - Args: - api_token (str, optional): ASI API token. - """ + def __init__( + self, + api_token: Annotated[str | None, Doc('ASI API token.')] = None, + ): + """Initializes the `AttackSurfaceMgr` object.""" self.log = logging.getLogger(__name__) self.asi_client = ASIClient(api_token=api_token) if api_token else ASIClient() diff --git a/psengine/classic_alerts/classic_alert_mgr.py b/psengine/classic_alerts/classic_alert_mgr.py index 3d1fb667..2ab0eefc 100644 --- a/psengine/classic_alerts/classic_alert_mgr.py +++ b/psengine/classic_alerts/classic_alert_mgr.py @@ -43,12 +43,11 @@ class ClassicAlertMgr: """Alert Manager for Classic Alert (v3) API.""" - def __init__(self, rf_token: str = None): - """Initializes the ClassicAlertMgr object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the ClassicAlertMgr object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/psengine/collective_insights/collective_insights.py b/psengine/collective_insights/collective_insights.py index 5b64d87f..6b86e5c2 100644 --- a/psengine/collective_insights/collective_insights.py +++ b/psengine/collective_insights/collective_insights.py @@ -29,12 +29,11 @@ class CollectiveInsights: """Class for interacting with the Recorded Future Collective Insights API.""" - def __init__(self, rf_token: str = None): - """Initializes the CollectiveInsights object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the CollectiveInsights object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/psengine/endpoints.py b/psengine/endpoints.py index b41b51d8..85287868 100644 --- a/psengine/endpoints.py +++ b/psengine/endpoints.py @@ -136,6 +136,16 @@ EP_RISK_HISTORY_BASE = BASE_URL + '/risk' EP_RISK_HISTORY = EP_RISK_HISTORY_BASE + '/history' +############################################################################### +# Links API Endpoints +############################################################################### +LINKS_BASE_URL = f'{BASE_URL}/links' +LINKS_METADATA_URL = f'{LINKS_BASE_URL}/metadata' +EP_LINKS_SEARCH = f'{LINKS_BASE_URL}/search' +EP_LINKS_METADATA_SECTIONS = f'{LINKS_METADATA_URL}/sections' +EP_LINKS_METADATA_EVENTS = f'{LINKS_METADATA_URL}/events' +EP_LINKS_METADATA_ENTITIES = f'{LINKS_METADATA_URL}/entities' + ################################################################################ # Attack Surface Intelligence API Endpoints ################################################################################ diff --git a/psengine/fusion/fusion_mgr.py b/psengine/fusion/fusion_mgr.py index 4b2e7600..a926be67 100644 --- a/psengine/fusion/fusion_mgr.py +++ b/psengine/fusion/fusion_mgr.py @@ -36,12 +36,11 @@ class FusionMgr: """Manages requests for Recorded Future Fusion files.""" - def __init__(self, rf_token: str = None): - """Initializes the `FusionMgr` object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the `FusionMgr` object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/psengine/helpers/helpers.py b/psengine/helpers/helpers.py index 7f180974..ee9d9b13 100644 --- a/psengine/helpers/helpers.py +++ b/psengine/helpers/helpers.py @@ -504,6 +504,15 @@ def convert_relative_time( else input_time ) + @staticmethod + def is_rel_time_valid( + input_time: Annotated[str | None, Doc("Relative time string, e.g., '7d', '3h'.")], + ): + """Check that a relative time like `-3d` is valid.""" + if input_time is not None and not TimeHelpers.is_rel_time_valid(input_time): + raise ValueError(f'Invalid relative time: {input_time}') + return input_time + @staticmethod def check_uhash_prefix( value: Annotated[str | list, Doc('String or list of strings to check for uhash prefix.')], diff --git a/psengine/links/__init__.py b/psengine/links/__init__.py new file mode 100644 index 00000000..0abb25b2 --- /dev/null +++ b/psengine/links/__init__.py @@ -0,0 +1,38 @@ +##################################### TERMS OF USE ########################################### +# The following code is provided for demonstration purpose only, and should not be used # +# without independent verification. Recorded Future makes no representations or warranties, # +# express, implied, statutory, or otherwise, regarding any aspect of this code or of the # +# information it may retrieve, and provides it both strictly “as-is” and without assuming # +# responsibility for any information it may retrieve. Recorded Future shall not be liable # +# for, and you assume all risk of using, the foregoing. By using this code, Customer # +# represents that it is solely responsible for having all necessary licenses, permissions, # +# rights, and/or consents to connect to third party APIs, and that it is solely responsible # +# for having all necessary licenses, permissions, rights, and/or consents to any data # +# accessed from any third party API. # +############################################################################################## + +from .errors import ( + LinksError, + LinksMetadataError, + LinksSearchError, +) +from .links import ( + EntityLinks, + LinkedEntity, + LinkedIOC, + LinkedMalware, + LinkedTA, + LinkedTTP, +) +from .links_mgr import LinksMgr +from .models import ( + CriticalityAttribute, + EntityAttribute, + EntitySearchError, + GenericAttribute, + LinkSource, + MitreNameAttribute, + RiskAttribute, + SearchScope, + ThreatActorAttribute, +) diff --git a/psengine/links/errors.py b/psengine/links/errors.py new file mode 100644 index 00000000..2e1b5520 --- /dev/null +++ b/psengine/links/errors.py @@ -0,0 +1,26 @@ +##################################### TERMS OF USE ########################################### +# The following code is provided for demonstration purpose only, and should not be used # +# without independent verification. Recorded Future makes no representations or warranties, # +# express, implied, statutory, or otherwise, regarding any aspect of this code or of the # +# information it may retrieve, and provides it both strictly “as-is” and without assuming # +# responsibility for any information it may retrieve. Recorded Future shall not be liable # +# for, and you assume all risk of using, the foregoing. By using this code, Customer # +# represents that it is solely responsible for having all necessary licenses, permissions, # +# rights, and/or consents to connect to third party APIs, and that it is solely responsible # +# for having all necessary licenses, permissions, rights, and/or consents to any data # +# accessed from any third party API. # +############################################################################################## + +from ..errors import RecordedFutureError + + +class LinksError(RecordedFutureError): + """Base class for all exceptions raised by the Links module.""" + + +class LinksSearchError(LinksError): + """Error raised when a Links search request fails.""" + + +class LinksMetadataError(LinksError): + """Error raised when fetching or validating Links metadata fails.""" diff --git a/psengine/links/links.py b/psengine/links/links.py new file mode 100644 index 00000000..0447e22b --- /dev/null +++ b/psengine/links/links.py @@ -0,0 +1,152 @@ +##################################### TERMS OF USE ########################################### +# The following code is provided for demonstration purpose only, and should not be used # +# without independent verification. Recorded Future makes no representations or warranties, # +# express, implied, statutory, or otherwise, regarding any aspect of this code or of the # +# information it may retrieve, and provides it both strictly “as-is” and without assuming # +# responsibility for any information it may retrieve. Recorded Future shall not be liable # +# for, and you assume all risk of using, the foregoing. By using this code, Customer # +# represents that it is solely responsible for having all necessary licenses, permissions, # +# rights, and/or consents to connect to third party APIs, and that it is solely responsible # +# for having all necessary licenses, permissions, rights, and/or consents to any data # +# accessed from any third party API. # +############################################################################################## + + +from collections import defaultdict + +from ..common_models import IdNameType, RFBaseModel +from .models import EntityAttribute, EntitySearchError, LinksFilterObjects, LinksLimitsObjects + +LINK_IOC_TYPE = [ + 'type:InternetDomainName', + 'type:CyberVulnerability', + 'type:IpAddress', + 'type:Hash', + 'type:Url', +] + + +class LinkedIOC(IdNameType): + """Return linked iocs entities.""" + + risk_score: int + source: str | None = None + + +class LinkedTTP(IdNameType): + """Return linked TTPs entities.""" + + display_name: str + source: str | None = None + + +class LinkedTA(IdNameType): + """Return linked threat actors entities.""" + + source: str | None = None + + +class LinkedMalware(IdNameType): + """Return linked malware entities.""" + + source: str | None = None + + +class LinkedEntity(IdNameType): + """An entity connected to the search target.""" + + source: str | None = None + section: str | None = None + attributes: list[EntityAttribute] = [] + + +class EntityLinks(RFBaseModel): + """The result set for a single entity that was queried.""" + + entity: IdNameType | None = None + links: list[LinkedEntity] = [] + error: EntitySearchError | None = None + + def iocs(self) -> list[LinkedIOC]: + """Return linked indicators of compromise grouped by IOC type.""" + iocs = defaultdict(list) + for link in self.links: + if link.type_ in LINK_IOC_TYPE: + ioc_score = next( + (attr.value for attr in link.attributes if attr.id_ == 'risk_score'), + 0, + ) + iocs[link.type_].append( + LinkedIOC( + id=link.id_, + type=link.type_, + name=link.name, + risk_score=ioc_score, + source=link.source, + ) + ) + + return iocs + + def ttps(self) -> list[LinkedTTP]: + """Return linked MITRE ATT&CK techniques and their display names.""" + ttps = [] + for link in self.links: + if link.type_ == 'type:MitreAttackIdentifier': + display_name = next( + (attr.value for attr in link.attributes if attr.id_ == 'display_name'), + 'N/A', + ) + ttps.append( + LinkedTTP( + id=link.id_, + type=link.type_, + name=link.name, + display_name=display_name, + source=link.source, + ) + ) + + return ttps + + def threat_actors(self) -> list[LinkedTA]: + """Return linked organizations marked as threat actors.""" + tas = [] + for link in self.links: + if link.type_ == 'type:Organization': + is_threat_actor = next( + (attr.value for attr in link.attributes if attr.id_ == 'threat_actor'), + False, + ) + if is_threat_actor: + tas.append( + LinkedTA( + id=link.id_, + type=link.type_, + name=link.name, + source=link.source, + ) + ) + + return tas + + def malwares(self) -> list[LinkedMalware]: + """Return linked malware entities.""" + return [ + LinkedMalware( + id=link.id_, + type=link.type_, + name=link.name, + source=link.source, + ) + for link in self.links + if link.type_ == 'type:Malware' + ] + + +class LinksSearchIn(RFBaseModel): + """Model for payload sent to POST `/links/search` endpoint.""" + + entities: list[str] + filters: LinksFilterObjects | None = None + limits: LinksLimitsObjects | None = None diff --git a/psengine/links/links_mgr.py b/psengine/links/links_mgr.py new file mode 100644 index 00000000..a4dc50ba --- /dev/null +++ b/psengine/links/links_mgr.py @@ -0,0 +1,214 @@ +##################################### TERMS OF USE ########################################### +# The following code is provided for demonstration purpose only, and should not be used # +# without independent verification. Recorded Future makes no representations or warranties, # +# express, implied, statutory, or otherwise, regarding any aspect of this code or of the # +# information it may retrieve, and provides it both strictly “as-is” and without assuming # +# responsibility for any information it may retrieve. Recorded Future shall not be liable # +# for, and you assume all risk of using, the foregoing. By using this code, Customer # +# represents that it is solely responsible for having all necessary licenses, permissions, # +# rights, and/or consents to connect to third party APIs, and that it is solely responsible # +# for having all necessary licenses, permissions, rights, and/or consents to any data # +# accessed from any third party API. # +############################################################################################## + +import logging +from typing import Annotated + +from pydantic import AfterValidator, Field, validate_call +from typing_extensions import Doc + +from ..common_models import IdName +from ..endpoints import ( + EP_LINKS_METADATA_ENTITIES, + EP_LINKS_METADATA_EVENTS, + EP_LINKS_METADATA_SECTIONS, + EP_LINKS_SEARCH, +) +from ..helpers import connection_exceptions, debug_call +from ..helpers.helpers import Validators +from ..rf_client import RFClient +from .errors import LinksMetadataError, LinksSearchError +from .links import ( + EntityLinks, + LinksSearchIn, +) +from .models import ( + FilterTechnical, + LinksFilterObjects, + LinksLimitsObjects, + LinkSource, + SearchScope, +) + + +class LinksMgr: + """Manager for interacting with the Recorded Future Links API.""" + + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initialize the `LinksMgr` object.""" + self.log = logging.getLogger(__name__) + self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() + + @debug_call + @validate_call + @connection_exceptions(ignore_status_code=[], exception_to_raise=LinksMetadataError) + def list_sections( + self, + ) -> Annotated[list[IdName], Doc('List of section objects with id and name.')]: + """List all sections that can be used to filter a Link search. + + Sections are the high-level categories the Links API groups results into, + for example *Actors, Tools & TTPs* or *Indicators & Detection Rules*. + + Endpoint: + `/links/metadata/sections` + + Raises: + ValidationError: If any supplied parameter is of incorrect type. + LinksMetadataError: If an API or connection error occurs. + """ + response = self.rf_client.request(method='GET', url=EP_LINKS_METADATA_SECTIONS) + return [IdName.model_validate(item) for item in response.json()['data']] + + @debug_call + @validate_call + @connection_exceptions(ignore_status_code=[], exception_to_raise=LinksMetadataError) + def list_events( + self, + ) -> Annotated[list[IdName], Doc('List of event objects with id and name.')]: + """List all event types that can be used to filter technical Link searches. + + Event types describe the kind of analytical evidence that produced a + technical link (for example `TTPAnalysis` or `InfrastructureAnalysis`). + + Endpoint: + `/links/metadata/events` + + Raises: + ValidationError: If any supplied parameter is of incorrect type. + LinksMetadataError: If an API or connection error occurs. + """ + response = self.rf_client.request(method='GET', url=EP_LINKS_METADATA_EVENTS) + return [IdName.model_validate(item) for item in response.json()['data']] + + @debug_call + @validate_call + @connection_exceptions(ignore_status_code=[], exception_to_raise=LinksMetadataError) + def list_entity_types( + self, + ) -> Annotated[list[IdName], Doc('List of entity-type objects with id and name.')]: + """List all entity types that can be used to filter a Link search. + + The returned values are the supported types for connected entities + (for example `Malware`, `Company`, `IpAddress`). + + Endpoint: + `/links/metadata/entities` + + Raises: + ValidationError: If any supplied parameter is of incorrect type. + LinksMetadataError: If an API or connection error occurs. + """ + response = self.rf_client.request(method='GET', url=EP_LINKS_METADATA_ENTITIES) + return [IdName.model_validate(item) for item in response.json()['data']] + + @debug_call + @validate_call + @connection_exceptions(ignore_status_code=[], exception_to_raise=LinksSearchError) + def search( + self, + entities: Annotated[ + Annotated[str | list[str], AfterValidator(Validators.convert_str_to_list)], + Doc('One or more Recorded Future entity IDs to look up links for.'), + ], + sections: Annotated[ + str | list[str] | None, Doc('Filter results to these link section IDs.') + ] = None, + entity_types: Annotated[ + str | list[str] | None, + Doc('Restrict linked entities to these entity types (e.g. "type:IpAddress").'), + ] = None, + sources: Annotated[ + str | list[LinkSource] | None, + Doc('Limit to source type(s): "technical", "insikt", or both if argument omitted.'), + ] = None, + timeframe: Annotated[ + str | None, + Doc('Technical-link timeframe (e.g. "-30d", default "-30d", max "-90d").'), + ] = None, + events: Annotated[ + str | list[str] | None, + Doc('Restrict technical links to these event types (e.g. "type:MalwareAnalysis").'), + ] = None, + connected_entities: Annotated[ + str | list[str] | None, + Doc('Only return technical links that connect to these entities.'), + ] = None, + search_scope: Annotated[ + SearchScope | None, + Doc('Result-volume scope: "small", "medium" (default), or "large".'), + ] = 'medium', + per_entity_type: Annotated[ + int | None, + Field(ge=1, le=1_000_000_000), + Doc('Max linked entities returned per entity type (>= 1 <= 1,000,000,000).'), + ] = None, + ) -> Annotated[ + list[EntityLinks], + Doc('A list of EntityLinks objects'), + ]: + """Search for technically validated relationships between threat intelligence + entities in the Recorded Future Intelligence Cloud — connections established + through sandbox analysis, infrastructure analysis, network traffic analysis, + and Insikt Group research. + + Issues a single batched request: the response contains one + `EntityLinks` per entity in `entities`, in the same order. If the + API failed for a specific entity, that result's `error` is populated + and `links` is empty — the rest of the batch still succeeds. + + Entities must be supplied as Recorded Future entity IDs; if you only have + a name, resolve it with `EntityMatchMgr` first. + + Endpoint: + `/links/search` + + If the API failed for a specific entity in the batch, its result looks like: + ```python + EntityLinks( + entity=IdNameType(id_='QCwdoU', name='...', type_='...'), + links=[], + error=EntitySearchError(message='...', status_code=404), + ) + ``` + + Raises: + ValidationError: If any supplied parameter is of incorrect type. + LinksSearchError: If an API or connection error occurs at the request level. + """ + technical_filters = FilterTechnical( + timeframe=timeframe, events=events, connected_entities=connected_entities + ).json() + + filters = LinksFilterObjects( + sections=sections, + entity_types=entity_types, + sources=sources, + technical=technical_filters or None, + ).json() + limits = LinksLimitsObjects( + search_scope=search_scope, per_entity_type=per_entity_type + ).json() + payload = LinksSearchIn( + entities=entities, + filters=filters or None, + limits=limits or None, + ).json() + + self.log.info(f'Executing links search for {len(entities)} entities.') + + response = self.rf_client.request(method='POST', url=EP_LINKS_SEARCH, data=payload) + return [EntityLinks.model_validate(item) for item in response.json()['data']] diff --git a/psengine/links/models.py b/psengine/links/models.py new file mode 100644 index 00000000..037e7f6f --- /dev/null +++ b/psengine/links/models.py @@ -0,0 +1,94 @@ +##################################### TERMS OF USE ########################################### +# The following code is provided for demonstration purpose only, and should not be used # +# without independent verification. Recorded Future makes no representations or warranties, # +# express, implied, statutory, or otherwise, regarding any aspect of this code or of the # +# information it may retrieve, and provides it both strictly “as-is” and without assuming # +# responsibility for any information it may retrieve. Recorded Future shall not be liable # +# for, and you assume all risk of using, the foregoing. By using this code, Customer # +# represents that it is solely responsible for having all necessary licenses, permissions, # +# rights, and/or consents to connect to third party APIs, and that it is solely responsible # +# for having all necessary licenses, permissions, rights, and/or consents to any data # +# accessed from any third party API. # +############################################################################################## + +from typing import Annotated, Any, Literal + +from pydantic import AfterValidator, BeforeValidator, Field + +from ..common_models import RFBaseModel +from ..helpers.helpers import Validators + + +class RiskAttribute(RFBaseModel): + id_: Literal['risk_score', 'risk_level'] = Field(alias='id') + value: float | str | None = None + + +class CriticalityAttribute(RFBaseModel): + id_: Literal['criticality'] = Field(alias='id') + value: str | None = None + + +class MitreNameAttribute(RFBaseModel): + id_: Literal['display_name'] = Field(alias='id') + value: str | None = None + + +class ThreatActorAttribute(RFBaseModel): + id_: Literal['threat_actor'] = Field(alias='id') + value: bool | None = None + + +class GenericAttribute(RFBaseModel): + id_: str = Field(alias='id') + value: Any + + +# Discriminated union for the 'attributes' array. Pydantic matches on the +# Literal 'id' values; unknown ids fall back to GenericAttribute. +EntityAttribute = ( + RiskAttribute + | CriticalityAttribute + | MitreNameAttribute + | ThreatActorAttribute + | GenericAttribute +) + + +class EntitySearchError(RFBaseModel): + message: str + status_code: int + + +LinkSource = Literal['technical', 'insikt'] +SearchScope = Literal['small', 'medium', 'large'] + + +class FilterTechnical(RFBaseModel): + """Fields in the Technical Object of Filters.""" + + timeframe: Annotated[str | None, AfterValidator(Validators.is_rel_time_valid)] = None + events: Annotated[list[str] | None, BeforeValidator(Validators.convert_str_to_list)] = None + connected_entities: Annotated[ + list[str] | None, BeforeValidator(Validators.convert_str_to_list) + ] = None + + +class LinksFilterObjects(RFBaseModel): + """Objects in the fields data parameter of links.""" + + sections: Annotated[list[str] | None, BeforeValidator(Validators.convert_str_to_list)] = None + entity_types: Annotated[list[str] | None, BeforeValidator(Validators.convert_str_to_list)] = ( + None + ) + sources: Annotated[list[LinkSource] | None, BeforeValidator(Validators.convert_str_to_list)] = ( + None + ) + technical: FilterTechnical | None = None + + +class LinksLimitsObjects(RFBaseModel): + """Objects in the limits object fields.""" + + search_scope: SearchScope | None = None + per_entity_type: Annotated[int, Field(ge=1, le=1_000_000_000)] | None = None diff --git a/psengine/malware_intel/auto_sigma_mgr.py b/psengine/malware_intel/auto_sigma_mgr.py index 4b2240ee..d9f44fff 100644 --- a/psengine/malware_intel/auto_sigma_mgr.py +++ b/psengine/malware_intel/auto_sigma_mgr.py @@ -51,12 +51,11 @@ class AutoSigmaMgr: """Manages requests for Recorded Future Malware Intelligence API Auto Sigma feature.""" - def __init__(self, rf_token: str = None): - """Initializes the `AutoSigmaMgr` object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the `AutoSigmaMgr` object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/psengine/malware_intel/auto_yara_mgr.py b/psengine/malware_intel/auto_yara_mgr.py index dae26039..0fa24061 100644 --- a/psengine/malware_intel/auto_yara_mgr.py +++ b/psengine/malware_intel/auto_yara_mgr.py @@ -49,12 +49,11 @@ class AutoYaraMgr: """Manages requests for Recorded Future Malware Intelligence API Auto YARA feature.""" - def __init__(self, rf_token: str = None): - """Initializes the `AutoYaraMgr` object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the `AutoYaraMgr` object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/psengine/malware_intel/malware_intel_mgr.py b/psengine/malware_intel/malware_intel_mgr.py index ec1dfd06..ad89c39a 100644 --- a/psengine/malware_intel/malware_intel_mgr.py +++ b/psengine/malware_intel/malware_intel_mgr.py @@ -29,12 +29,11 @@ class MalwareIntelMgr: """Manages requests for Recorded Future Malware Intelligence API.""" - def __init__(self, rf_token: str = None): - """Initializes the `MalwareIntelMgr` object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the `MalwareIntelMgr` object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/psengine/risk_history/risk_history_mgr.py b/psengine/risk_history/risk_history_mgr.py index 5c3f8df1..568f711e 100644 --- a/psengine/risk_history/risk_history_mgr.py +++ b/psengine/risk_history/risk_history_mgr.py @@ -28,12 +28,11 @@ class RiskHistoryMgr: """Manages requests for Recorded Future Risk History information.""" - def __init__(self, rf_token: str = None): - """Initializes the `RiskHistoryMgr` object. - - Args: - rf_token (str, optional): Recorded Future API token. - """ + def __init__( + self, + rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None, + ): + """Initializes the `RiskHistoryMgr` object.""" self.log = logging.getLogger(__name__) self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient() diff --git a/tests/links/conftest.py b/tests/links/conftest.py new file mode 100644 index 00000000..67f903c3 --- /dev/null +++ b/tests/links/conftest.py @@ -0,0 +1,8 @@ +import pytest + +from psengine.links import LinksMgr + + +@pytest.fixture +def links_mgr(): + return LinksMgr() diff --git a/tests/links/test_links_mgr.py b/tests/links/test_links_mgr.py new file mode 100644 index 00000000..7f6697c1 --- /dev/null +++ b/tests/links/test_links_mgr.py @@ -0,0 +1,391 @@ +##################################### TERMS OF USE ########################################### +# The following code is provided for demonstration purpose only, and should not be used # +# without independent verification. Recorded Future makes no representations or warranties, # +# express, implied, statutory, or otherwise, regarding any aspect of this code or of the # +# information it may retrieve, and provides it both strictly “as-is” and without assuming # +# responsibility for any information it may retrieve. Recorded Future shall not be liable # +# for, and you assume all risk of using, the foregoing. By using this code, Customer # +# represents that it is solely responsible for having all necessary licenses, permissions, # +# rights, and/or consents to connect to third party APIs, and that it is solely responsible # +# for having all necessary licenses, permissions, rights, and/or consents to any data # +# accessed from any third party API. # +############################################################################################## + +import pytest +from pydantic import ValidationError +from requests import Response +from requests.exceptions import HTTPError + +from psengine.links import LinksMgr +from psengine.links.errors import LinksMetadataError, LinksSearchError +from psengine.links.models import ( + CriticalityAttribute, + GenericAttribute, + LinksFilterObjects, + MitreNameAttribute, + RiskAttribute, + ThreatActorAttribute, +) + + +def test_list_sections(links_mgr: LinksMgr, mocker, make_response): + mock_data = { + 'data': [ + {'id': 's1', 'name': 'Section 1', 'description': 'Desc 1'}, + {'id': 's2', 'name': 'Section 2', 'description': 'Desc 2'}, + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + sections = links_mgr.list_sections() + assert len(sections) == 2 + assert sections[0].id_ == 's1' + assert sections[1].name == 'Section 2' + + +def test_list_events(links_mgr: LinksMgr, mocker, make_response): + mock_data = { + 'data': [ + {'id': 'e1', 'name': 'Event 1', 'description': 'Desc 1'}, + {'id': 'e2', 'name': 'Event 2', 'description': 'Desc 2'}, + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + events = links_mgr.list_events() + assert len(events) == 2 + assert events[0].id_ == 'e1' + assert events[1].name == 'Event 2' + + +def test_list_entity_types(links_mgr: LinksMgr, mocker, make_response): + mock_data = { + 'data': [ + {'id': 'Type1', 'name': 'Type 1'}, + {'id': 'Type2', 'name': 'Type 2'}, + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + entity_types = links_mgr.list_entity_types() + assert len(entity_types) == 2 + assert entity_types[0].id_ == 'Type1' + assert entity_types[1].name == 'Type 2' + + +def test_search_basic(links_mgr: LinksMgr, mocker, make_response): + mock_data = { + 'data': [ + { + 'entity': {'id': 'ent1', 'name': 'Entity 1', 'type': 'Type1'}, + 'links': [ + { + 'id': 'link1', + 'name': 'Link 1', + 'type': 'Type2', + 'source': 'technical', + 'section': 's1', + 'attributes': [{'id': 'risk_score', 'value': 50}], + } + ], + } + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + results = links_mgr.search(entities=['ent1']) + assert len(results) == 1 + assert results[0].entity.id_ == 'ent1' + assert len(results[0].links) == 1 + assert results[0].links[0].id_ == 'link1' + + +def test_filter_objects_invalid_source(): + with pytest.raises(ValidationError, match='sources'): + LinksFilterObjects(sources=['invalid_source']) + + +def test_metadata_error(links_mgr: LinksMgr, mocker): + mock_resp = mocker.Mock(spec=Response) + mock_resp.status_code = 500 + mock_resp.text = 'Internal Server Error' + mocker.patch.object( + links_mgr.rf_client, + 'request', + side_effect=HTTPError('500 Server Error', response=mock_resp), + ) + + with pytest.raises(LinksMetadataError, match='500'): + links_mgr.list_sections() + + +def test_search_error(links_mgr: LinksMgr, mocker): + mock_resp = mocker.Mock(spec=Response) + mock_resp.status_code = 400 + mock_resp.text = 'Bad Request' + mocker.patch.object( + links_mgr.rf_client, + 'request', + side_effect=HTTPError('400 Client Error', response=mock_resp), + ) + + with pytest.raises(LinksSearchError, match='400'): + links_mgr.search(entities=['ent1']) + + +def test_search_complex_attributes(links_mgr: LinksMgr, mocker, make_response): + mock_data = { + 'data': [ + { + 'entity': {'id': 'ent1', 'name': 'Entity 1', 'type': 'Type1'}, + 'links': [ + { + 'id': 'link1', + 'name': 'Link 1', + 'type': 'Type2', + 'attributes': [ + {'id': 'risk_score', 'value': 75.0}, + {'id': 'risk_level', 'value': 'High'}, + {'id': 'criticality', 'value': 'Critical'}, + {'id': 'display_name', 'value': 'T1234'}, + {'id': 'threat_actor', 'value': True}, + {'id': 'unknown_attr', 'value': 'some value'}, + ], + } + ], + } + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + results = links_mgr.search(entities=['ent1']) + attrs = results[0].links[0].attributes + assert len(attrs) == 6 + + assert isinstance(attrs[0], RiskAttribute) + assert attrs[0].id_ == 'risk_score' + assert attrs[0].value == 75.0 + + assert isinstance(attrs[1], RiskAttribute) + assert attrs[1].id_ == 'risk_level' + assert attrs[1].value == 'High' + + assert isinstance(attrs[2], CriticalityAttribute) + assert attrs[2].value == 'Critical' + + assert isinstance(attrs[3], MitreNameAttribute) + assert attrs[3].value == 'T1234' + + assert isinstance(attrs[4], ThreatActorAttribute) + assert attrs[4].value is True + + assert isinstance(attrs[5], GenericAttribute) + assert attrs[5].id_ == 'unknown_attr' + assert attrs[5].value == 'some value' + + +def test_search_with_limits(links_mgr: LinksMgr, mocker, make_response): + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response({'data': []})) + + links_mgr.search(entities=['ent1'], search_scope='small', per_entity_type=10) + + _, kwargs = links_mgr.rf_client.request.call_args + assert kwargs['data']['limits']['search_scope'] == 'small' + assert kwargs['data']['limits']['per_entity_type'] == 10 + + +def test_entity_links_iocs_groups_results_and_defaults_risk_score( + links_mgr: LinksMgr, mocker, make_response +): + mock_data = { + 'data': [ + { + 'entity': {'id': 'ent1', 'name': 'Entity 1', 'type': 'Type1'}, + 'links': [ + { + 'id': 'ioc1', + 'name': 'example.com', + 'type': 'type:InternetDomainName', + 'source': 'technical', + 'attributes': [{'id': 'risk_score', 'value': 90}], + }, + { + 'id': 'ioc2', + 'name': 'abcdef1234', + 'type': 'type:Hash', + 'source': 'insikt', + 'attributes': [], + }, + { + 'id': 'not-ioc', + 'name': 'Not IOC', + 'type': 'type:Organization', + 'attributes': [{'id': 'threat_actor', 'value': True}], + }, + ], + } + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + entity_links = links_mgr.search(entities=['ent1'])[0] + iocs = entity_links.iocs() + + assert set(iocs.keys()) == {'type:InternetDomainName', 'type:Hash'} + assert len(iocs['type:InternetDomainName']) == 1 + assert len(iocs['type:Hash']) == 1 + assert iocs['type:InternetDomainName'][0].risk_score == 90 + assert iocs['type:InternetDomainName'][0].source == 'technical' + assert iocs['type:Hash'][0].risk_score == 0 + assert iocs['type:Hash'][0].source == 'insikt' + + +def test_entity_links_ttps_filters_mitre_attack_links(links_mgr: LinksMgr, mocker, make_response): + mock_data = { + 'data': [ + { + 'entity': {'id': 'ent1', 'name': 'Entity 1', 'type': 'Type1'}, + 'links': [ + { + 'id': 'ttp1', + 'name': 'T1059', + 'type': 'type:MitreAttackIdentifier', + 'source': 'technical', + 'attributes': [{'id': 'display_name', 'value': 'Command and Scripting'}], + }, + { + 'id': 'ttp2', + 'name': 'T1566', + 'type': 'type:MitreAttackIdentifier', + 'source': 'insikt', + 'attributes': [], + }, + { + 'id': 'not-ttp', + 'name': 'Not TTP', + 'type': 'type:Malware', + 'attributes': [], + }, + ], + } + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + ttps = links_mgr.search(entities=['ent1'])[0].ttps() + + assert len(ttps) == 2 + assert ttps[0].id_ == 'ttp1' + assert ttps[0].display_name == 'Command and Scripting' + assert ttps[0].source == 'technical' + assert ttps[1].id_ == 'ttp2' + assert ttps[1].display_name == 'N/A' + assert ttps[1].source == 'insikt' + + +def test_entity_links_threat_actors_only_returns_marked_organizations( + links_mgr: LinksMgr, mocker, make_response +): + mock_data = { + 'data': [ + { + 'entity': {'id': 'ent1', 'name': 'Entity 1', 'type': 'Type1'}, + 'links': [ + { + 'id': 'ta1', + 'name': 'Threat Actor Org', + 'type': 'type:Organization', + 'source': 'insikt', + 'attributes': [{'id': 'threat_actor', 'value': True}], + }, + { + 'id': 'org2', + 'name': 'Benign Org', + 'type': 'type:Organization', + 'attributes': [{'id': 'threat_actor', 'value': False}], + }, + { + 'id': 'not-org', + 'name': 'Not Org', + 'type': 'type:Malware', + 'attributes': [{'id': 'threat_actor', 'value': True}], + }, + ], + } + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + threat_actors = links_mgr.search(entities=['ent1'])[0].threat_actors() + + assert len(threat_actors) == 1 + assert threat_actors[0].id_ == 'ta1' + assert threat_actors[0].name == 'Threat Actor Org' + assert threat_actors[0].source == 'insikt' + + +def test_entity_links_malwares_only_returns_malware_links( + links_mgr: LinksMgr, mocker, make_response +): + mock_data = { + 'data': [ + { + 'entity': {'id': 'ent1', 'name': 'Entity 1', 'type': 'Type1'}, + 'links': [ + { + 'id': 'mw1', + 'name': 'Malware One', + 'type': 'type:Malware', + 'source': 'technical', + 'attributes': [], + }, + { + 'id': 'not-malware', + 'name': 'Not Malware', + 'type': 'type:Organization', + 'attributes': [{'id': 'threat_actor', 'value': True}], + }, + { + 'id': 'mw2', + 'name': 'Malware Two', + 'type': 'type:Malware', + 'source': 'insikt', + 'attributes': [], + }, + ], + } + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + malwares = links_mgr.search(entities=['ent1'])[0].malwares() + + assert len(malwares) == 2 + assert [malware.id_ for malware in malwares] == ['mw1', 'mw2'] + assert [malware.source for malware in malwares] == ['technical', 'insikt'] + + +def test_entity_links_threat_actors_skips_organization_missing_attribute( + links_mgr: LinksMgr, mocker, make_response +): + mock_data = { + 'data': [ + { + 'entity': {'id': 'ent1', 'name': 'Entity 1', 'type': 'Type1'}, + 'links': [ + { + 'id': 'org-no-attr', + 'name': 'Organization Without Flag', + 'type': 'type:Organization', + 'source': 'insikt', + 'attributes': [], + }, + ], + } + ] + } + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response(mock_data)) + + threat_actors = links_mgr.search(entities=['ent1'])[0].threat_actors() + + assert threat_actors == [] diff --git a/tests/links/test_links_models.py b/tests/links/test_links_models.py new file mode 100644 index 00000000..244d5ad2 --- /dev/null +++ b/tests/links/test_links_models.py @@ -0,0 +1,215 @@ +##################################### TERMS OF USE ########################################### +# The following code is provided for demonstration purpose only, and should not be used # +# without independent verification. Recorded Future makes no representations or warranties, # +# express, implied, statutory, or otherwise, regarding any aspect of this code or of the # +# information it may retrieve, and provides it both strictly “as-is” and without assuming # +# responsibility for any information it may retrieve. Recorded Future shall not be liable # +# for, and you assume all risk of using, the foregoing. By using this code, Customer # +# represents that it is solely responsible for having all necessary licenses, permissions, # +# rights, and/or consents to connect to third party APIs, and that it is solely responsible # +# for having all necessary licenses, permissions, rights, and/or consents to any data # +# accessed from any third party API. # +############################################################################################## + +import pytest +from pydantic import ValidationError + +from psengine.endpoints import EP_LINKS_SEARCH +from psengine.links.models import ( + FilterTechnical, + LinksFilterObjects, + LinksLimitsObjects, +) + + +def test_filter_technical_timeframe_invalid_format(): + with pytest.raises(ValidationError, match='Invalid relative time'): + FilterTechnical(timeframe='not-a-time') + + +def test_filter_technical_converts_string_fields_to_lists(): + model = FilterTechnical( + timeframe='-30d', + events='type:MalwareAnalysis', + connected_entities='id:Ent1', + ) + + assert model.timeframe == '-30d' + assert model.events == ['type:MalwareAnalysis'] + assert model.connected_entities == ['id:Ent1'] + + +def test_filter_technical_removes_none_values_in_list_fields(): + model = FilterTechnical( + events=['type:MalwareAnalysis', None, 'type:TTPAnalysis'], + connected_entities=['id:Ent1', None, 'id:Ent2'], + ) + + assert model.events == ['type:MalwareAnalysis', 'type:TTPAnalysis'] + assert model.connected_entities == ['id:Ent1', 'id:Ent2'] + + +def test_filter_technical_json_excludes_unset_fields(): + model = FilterTechnical(timeframe='-7d') + assert model.json() == {'timeframe': '-7d'} + + +def test_links_filter_objects_normalizes_scalar_fields(): + model = LinksFilterObjects( + sections='section:actors', + entity_types='type:IpAddress', + sources=['technical', 'insikt'], + technical=FilterTechnical(timeframe='-30d'), + ) + + assert model.json() == { + 'sections': ['section:actors'], + 'entity_types': ['type:IpAddress'], + 'sources': ['technical', 'insikt'], + 'technical': {'timeframe': '-30d'}, + } + + +def test_links_filter_objects_invalid_source(): + with pytest.raises(ValidationError, match='sources'): + LinksFilterObjects(sources=['invalid_source']) + + +def test_links_limits_objects_serializes_search_scope(): + model = LinksLimitsObjects(search_scope='small', per_entity_type=25) + assert model.json() == {'search_scope': 'small', 'per_entity_type': 25} + + +def test_links_limits_objects_invalid_search_scope(): + with pytest.raises(ValidationError, match='search_scope'): + LinksLimitsObjects(search_scope='huge') + + +def test_links_limits_objects_per_entity_type_rejects_non_positive(): + with pytest.raises(ValidationError, match='per_entity_type'): + LinksLimitsObjects(per_entity_type=0) + + +def test_links_mgr_search_builds_minimal_payload_with_entity_str(links_mgr, mocker, make_response): + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response({'data': []})) + + links_mgr.search(entities='ent1') + + _, kwargs = links_mgr.rf_client.request.call_args + assert kwargs['method'] == 'POST' + assert kwargs['url'] == EP_LINKS_SEARCH + assert kwargs['data'] == {'entities': ['ent1'], 'limits': {'search_scope': 'medium'}} + + +def test_links_mgr_search_omits_empty_technical_filter_object(links_mgr, mocker, make_response): + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response({'data': []})) + + links_mgr.search(entities='ent1', sections='section:actors') + + _, kwargs = links_mgr.rf_client.request.call_args + assert kwargs['data']['filters'] == {'sections': ['section:actors']} + + +@pytest.mark.parametrize( + ('search_kwargs', 'expected_filter_key', 'expected_value'), + [ + ({'sections': 'section:actors'}, ('sections',), ['section:actors']), + ({'entity_types': 'type:IpAddress'}, ('entity_types',), ['type:IpAddress']), + ({'events': 'type:MalwareAnalysis'}, ('technical', 'events'), ['type:MalwareAnalysis']), + ({'connected_entities': 'id:Ent1'}, ('technical', 'connected_entities'), ['id:Ent1']), + ], +) +def test_links_mgr_search_converts_str_filter_fields_to_lists( + links_mgr, mocker, make_response, search_kwargs, expected_filter_key, expected_value +): + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response({'data': []})) + + links_mgr.search(entities='ent1', **search_kwargs) + + _, kwargs = links_mgr.rf_client.request.call_args + filters = kwargs['data']['filters'] + target = filters + for key in expected_filter_key: + target = target[key] + assert target == expected_value + + +@pytest.mark.parametrize( + ('search_kwargs', 'expected_filter_key', 'expected_value'), + [ + ( + {'sections': ['section:actors', 'section:tools']}, + ('sections',), + ['section:actors', 'section:tools'], + ), + ( + {'entity_types': ['type:IpAddress', 'type:DomainName']}, + ('entity_types',), + ['type:IpAddress', 'type:DomainName'], + ), + ( + {'events': ['type:MalwareAnalysis', 'type:TTPAnalysis']}, + ('technical', 'events'), + ['type:MalwareAnalysis', 'type:TTPAnalysis'], + ), + ( + {'connected_entities': ['id:Ent1', 'id:Ent2']}, + ('technical', 'connected_entities'), + ['id:Ent1', 'id:Ent2'], + ), + ], +) +def test_links_mgr_search_preserves_list_filter_fields( + links_mgr, mocker, make_response, search_kwargs, expected_filter_key, expected_value +): + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response({'data': []})) + + links_mgr.search(entities='ent1', **search_kwargs) + + _, kwargs = links_mgr.rf_client.request.call_args + filters = kwargs['data']['filters'] + target = filters + for key in expected_filter_key: + target = target[key] + assert target == expected_value + + +def test_links_mgr_search_builds_full_payload_from_model_inputs(links_mgr, mocker, make_response): + mocker.patch.object(links_mgr.rf_client, 'request', return_value=make_response({'data': []})) + + links_mgr.search( + entities=['ent1', 'ent2'], + sections='section:actors', + entity_types='type:IpAddress', + sources=['technical', 'insikt'], + timeframe='-30d', + events='type:MalwareAnalysis', + connected_entities=['id:EntA', 'id:EntB'], + search_scope='large', + per_entity_type=10, + ) + + _, kwargs = links_mgr.rf_client.request.call_args + assert kwargs['data'] == { + 'entities': ['ent1', 'ent2'], + 'filters': { + 'sections': ['section:actors'], + 'entity_types': ['type:IpAddress'], + 'sources': ['technical', 'insikt'], + 'technical': { + 'timeframe': '-30d', + 'events': ['type:MalwareAnalysis'], + 'connected_entities': ['id:EntA', 'id:EntB'], + }, + }, + 'limits': {'search_scope': 'large', 'per_entity_type': 10}, + } + + +def test_links_mgr_search_invalid_timeframe_raises_before_request(links_mgr, mocker): + request_mock = mocker.patch.object(links_mgr.rf_client, 'request') + + with pytest.raises(ValidationError, match='Invalid relative time'): + links_mgr.search(entities=['ent1'], timeframe='invalid-time') + + request_mock.assert_not_called() diff --git a/uv.lock b/uv.lock index d0543427..ef9a364d 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <3.15" [options] -exclude-newer = "2026-04-24T09:50:18.171105Z" +exclude-newer = "2026-05-12T14:28:43.699679Z" exclude-newer-span = "P7D" [[package]] @@ -822,7 +822,7 @@ wheels = [ [[package]] name = "psengine" -version = "2.6.0" +version = "2.7.0" source = { editable = "." } dependencies = [ { name = "jsonpath-ng" },