Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,368 changes: 1,368 additions & 0 deletions domaintools_iris.json

Large diffs are not rendered by default.

262 changes: 262 additions & 0 deletions domaintools_iris_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ class DomainToolsConnector(BaseConnector):
ACTION_ID_PARSED_DOMAIN_RDAP_FEED = "parsed_domain_rdap_feed"
ACTION_ID_DOMAIN_RISK_FEED = "domain_risk_feed"
ACTION_ID_DOMAIN_HOTLIST_FEED = "domain_hotlist_feed"

# Iris Detect action_ids
ACTION_ID_IRIS_DETECT_GET_NEW_DOMAINS = "iris_detect_get_new_domains"
ACTION_ID_IRIS_DETECT_GET_WATCHED_DOMAINS = "iris_detect_get_watched_domains"
ACTION_ID_IRIS_DETECT_GET_IGNORED_DOMAINS = "iris_detect_get_ignored_domains"
ACTION_ID_IRIS_DETECT_GET_ESCALATED_DOMAINS = "iris_detect_get_escalated_domains"
ACTION_ID_IRIS_DETECT_GET_BLOCKLIST_DOMAINS = "iris_detect_get_blocklist_domains"
ACTION_ID_IRIS_DETECT_GET_MONITORS_LIST = "iris_detect_get_monitors_list"
ACTION_ID_IRIS_DETECT_ESCALATE_DOMAINS = "iris_detect_escalate_domains"
ACTION_ID_IRIS_DETECT_BLOCKLIST_DOMAINS = "iris_detect_blocklist_domains"
ACTION_ID_IRIS_DETECT_WATCH_DOMAINS = "iris_detect_watch_domains"
ACTION_ID_IRIS_DETECT_IGNORE_DOMAINS = "iris_detect_ignore_domains"

RTUF_SERVICES_LIST = [
"nod",
"nad",
Expand Down Expand Up @@ -83,6 +96,16 @@ def __init__(self):
self.ACTION_ID_PARSED_DOMAIN_RDAP_FEED: self._parsed_domain_rdap_feed,
self.ACTION_ID_DOMAIN_RISK_FEED: self._domain_risk_feed,
self.ACTION_ID_DOMAIN_HOTLIST_FEED: self._domain_hotlist_feed,
self.ACTION_ID_IRIS_DETECT_GET_NEW_DOMAINS: self._iris_detect_get_new_domains,
self.ACTION_ID_IRIS_DETECT_GET_WATCHED_DOMAINS: self._iris_detect_get_watched_domains,
self.ACTION_ID_IRIS_DETECT_GET_IGNORED_DOMAINS: self._iris_detect_get_ignored_domains,
self.ACTION_ID_IRIS_DETECT_GET_ESCALATED_DOMAINS: self._iris_detect_get_escalated_domains,
self.ACTION_ID_IRIS_DETECT_GET_BLOCKLIST_DOMAINS: self._iris_detect_get_blocklist_domains,
self.ACTION_ID_IRIS_DETECT_GET_MONITORS_LIST: self._iris_detect_get_monitors_list,
self.ACTION_ID_IRIS_DETECT_ESCALATE_DOMAINS: self._iris_detect_escalate_domains,
self.ACTION_ID_IRIS_DETECT_BLOCKLIST_DOMAINS: self._iris_detect_blocklist_domains,
self.ACTION_ID_IRIS_DETECT_WATCH_DOMAINS: self._iris_detect_watch_domains,
self.ACTION_ID_IRIS_DETECT_IGNORE_DOMAINS: self._iris_detect_ignore_domains,
}

def initialize(self):
Expand Down Expand Up @@ -983,6 +1006,245 @@ def _get_rtuf_actions_params(self, param):

return params

def _get_dt_api(self):
return API(
self._username,
self._key,
app_partner=self.app_partner,
app_name=self.app_name,
app_version=self.app_version_number,
proxy_url=self._proxy_url,
verify_ssl=self._ssl,
https=self._ssl if isinstance(self._ssl, bool) else True,
)

def _parse_detect_response(self, action_result, results, summary_key="domain_count"):
data = list(results)
action_result.update_data(data)
action_result.update_summary({summary_key: len(data)})
return action_result.set_status(phantom.APP_SUCCESS)

def _detect_error(self, action_result, e):
error_code, error_msg = self._get_error_message_from_exception(e)
return action_result.set_status(phantom.APP_ERROR, f"Error Code: {error_code}. Error Message: {error_msg}")

def _iris_detect_get_new_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_GET_NEW_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
results = dt_api.iris_detect_new_domains(
monitor_id=param.get("monitor_id"),
tlds=param.get("tlds"),
risk_score_ranges=param.get("risk_score_ranges"),
mx_exists=param.get("mx_exists"),
discovered_since=param.get("discovered_since"),
changed_since=param.get("changed_since"),
search=param.get("search"),
sort=param.get("sort"),
order=param.get("order"),
include_domain_data=param.get("include_domain_data", False),
limit=param.get("limit"),
preview=param.get("preview"),
)
ret_val = self._parse_detect_response(action_result, results)
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_GET_NEW_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_get_watched_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_GET_WATCHED_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
results = dt_api.iris_detect_watched_domains(
monitor_id=param.get("monitor_id"),
tlds=param.get("tlds"),
risk_score_ranges=param.get("risk_score_ranges"),
mx_exists=param.get("mx_exists"),
discovered_since=param.get("discovered_since"),
changed_since=param.get("changed_since"),
escalated_since=param.get("escalated_since"),
search=param.get("search"),
sort=param.get("sort"),
order=param.get("order"),
include_domain_data=param.get("include_domain_data", False),
limit=param.get("limit"),
preview=param.get("preview"),
)
ret_val = self._parse_detect_response(action_result, results)
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_GET_WATCHED_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_get_ignored_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_GET_IGNORED_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
results = dt_api.iris_detect_ignored_domains(
monitor_id=param.get("monitor_id"),
tlds=param.get("tlds"),
risk_score_ranges=param.get("risk_score_ranges"),
mx_exists=param.get("mx_exists"),
discovered_since=param.get("discovered_since"),
changed_since=param.get("changed_since"),
escalated_since=param.get("escalated_since"),
search=param.get("search"),
sort=param.get("sort"),
order=param.get("order"),
include_domain_data=param.get("include_domain_data", False),
limit=param.get("limit"),
preview=param.get("preview"),
)
ret_val = self._parse_detect_response(action_result, results)
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_GET_IGNORED_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_get_escalated_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_GET_ESCALATED_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
results = dt_api.iris_detect_watched_domains(
monitor_id=param.get("monitor_id"),
escalation_types=["google_safe"],
tlds=param.get("tlds"),
risk_score_ranges=param.get("risk_score_ranges"),
mx_exists=param.get("mx_exists"),
discovered_since=param.get("discovered_since"),
changed_since=param.get("changed_since"),
escalated_since=param.get("escalated_since"),
search=param.get("search"),
sort=param.get("sort"),
order=param.get("order"),
include_domain_data=param.get("include_domain_data", False),
limit=param.get("limit"),
preview=param.get("preview"),
)
ret_val = self._parse_detect_response(action_result, results)
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_GET_ESCALATED_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_get_blocklist_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_GET_BLOCKLIST_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
results = dt_api.iris_detect_watched_domains(
monitor_id=param.get("monitor_id"),
escalation_types=["blocked"],
tlds=param.get("tlds"),
risk_score_ranges=param.get("risk_score_ranges"),
mx_exists=param.get("mx_exists"),
discovered_since=param.get("discovered_since"),
changed_since=param.get("changed_since"),
escalated_since=param.get("escalated_since"),
search=param.get("search"),
sort=param.get("sort"),
order=param.get("order"),
include_domain_data=param.get("include_domain_data", False),
limit=param.get("limit"),
preview=param.get("preview"),
)
ret_val = self._parse_detect_response(action_result, results)
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_GET_BLOCKLIST_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_get_monitors_list(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_GET_MONITORS_LIST} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
include_counts = param.get("include_counts", False)
kwargs = {}
if include_counts:
kwargs["include_counts"] = True
kwargs["datetime_counts_since"] = param.get("datetime_counts_since")
results = dt_api.iris_detect_monitors(
sort=param.get("sort"),
order=param.get("order", "desc"),
limit=param.get("limit"),
**kwargs,
)
ret_val = self._parse_detect_response(action_result, results, summary_key="monitor_count")
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_GET_MONITORS_LIST} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_escalate_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_ESCALATE_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
watchlist_domain_ids = param.get("watchlist_domain_ids", "").replace(" ", "").split(",")
results = dt_api.iris_detect_escalate_domains(
watchlist_domain_ids=watchlist_domain_ids,
escalation_type="google_safe",
)
ret_val = self._parse_detect_response(action_result, results, summary_key="escalated_count")
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_ESCALATE_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_blocklist_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_BLOCKLIST_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
watchlist_domain_ids = param.get("watchlist_domain_ids", "").replace(" ", "").split(",")
results = dt_api.iris_detect_escalate_domains(
watchlist_domain_ids=watchlist_domain_ids,
escalation_type="blocked",
)
ret_val = self._parse_detect_response(action_result, results, summary_key="blocklisted_count")
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_BLOCKLIST_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_watch_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_WATCH_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
watchlist_domain_ids = param.get("watchlist_domain_ids", "").replace(" ", "").split(",")
results = dt_api.iris_detect_manage_watchlist_domains(
watchlist_domain_ids=watchlist_domain_ids,
state="watched",
)
ret_val = self._parse_detect_response(action_result, results, summary_key="watched_count")
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_WATCH_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)

def _iris_detect_ignore_domains(self, param):
self.save_progress(f"Starting {self.ACTION_ID_IRIS_DETECT_IGNORE_DOMAINS} action.")
action_result = self.add_action_result(ActionResult(dict(param)))
try:
dt_api = self._get_dt_api()
watchlist_domain_ids = param.get("watchlist_domain_ids", "").replace(" ", "").split(",")
results = dt_api.iris_detect_manage_watchlist_domains(
watchlist_domain_ids=watchlist_domain_ids,
state="ignored",
)
ret_val = self._parse_detect_response(action_result, results, summary_key="ignored_count")
self.save_progress(f"Completed {self.ACTION_ID_IRIS_DETECT_IGNORE_DOMAINS} action.")
return ret_val
except Exception as e:
return self._detect_error(action_result, e)


if __name__ == "__main__":
import argparse
Expand Down
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Test configuration
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"

# Ruff linting
[tool.ruff]
line-length = 145
Expand Down
4 changes: 4 additions & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pytest>=7.4
pytest-mock>=3.12
domaintools_api==2.7.0
tldextract==5.3.0
76 changes: 76 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Running Tests

## Setup

Create and activate a virtual environment, then install test dependencies:

```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements-test.txt
```

> The `.venv` directory is already created if you have run tests before. Just activate it.

## Running Tests

### All tests
```bash
.venv/bin/python -m pytest
```

### A specific test file
```bash
.venv/bin/python -m pytest tests/test_iris_detect_get_new_domains.py
```

### A specific test case
```bash
.venv/bin/python -m pytest tests/test_iris_detect_get_new_domains.py::TestIrisDetectGetNewDomains::test_returns_success_with_results
```

### With verbose output
```bash
.venv/bin/python -m pytest -v
```

### Stop on first failure
```bash
.venv/bin/python -m pytest -x
```

## Test Files

| File | Action Tested |
|---|---|
| `test_iris_detect_get_new_domains.py` | `_iris_detect_get_new_domains` |
| `test_iris_detect_get_watched_domains.py` | `_iris_detect_get_watched_domains` |
| `test_iris_detect_get_ignored_domains.py` | `_iris_detect_get_ignored_domains` |
| `test_iris_detect_get_escalated_domains.py` | `_iris_detect_get_escalated_domains` |
| `test_iris_detect_get_blocklist_domains.py` | `_iris_detect_get_blocklist_domains` |
| `test_iris_detect_get_monitors_list.py` | `_iris_detect_get_monitors_list` |
| `test_iris_detect_escalate_domains.py` | `_iris_detect_escalate_domains` |
| `test_iris_detect_blocklist_domains.py` | `_iris_detect_blocklist_domains` |
| `test_iris_detect_watch_domains.py` | `_iris_detect_watch_domains` |
| `test_iris_detect_ignore_domains.py` | `_iris_detect_ignore_domains` |

## How It Works

The `phantom.*` packages are only available inside a real Splunk SOAR instance. The `conftest.py` file stubs out the entire `phantom` namespace before the connector is imported, allowing tests to run locally without a SOAR instance.

The `domaintools.API` wrapper is patched via the `mock_dt_api` fixture in `conftest.py`, so no real API credentials or network access are needed.

### Key fixtures (defined in `conftest.py`)

| Fixture | Description |
|---|---|
| `connector` | A pre-configured `DomainToolsConnector` instance |
| `mock_dt_api` | Patches `_get_dt_api` and returns a `MagicMock` API instance |

### Response helpers (defined in `conftest.py`)

| Helper | Returns |
|---|---|
| `make_domain(...)` | A fake domain dict matching the Iris Detect API response shape |
| `make_monitor(...)` | A fake monitor dict |
| `make_escalation(...)` | A fake escalation dict |
Empty file added tests/__init__.py
Empty file.
Loading