Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/test-build-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install -r ./requirements/development.txt
pip install -e ".[test]"

- name: Setup E2E environment
run: |
Expand Down
45 changes: 41 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,32 @@ You can get the status code of a response outside of exception handling by doing
api.domain_profile('google.com').status == 200
```

IrisQL
===================

IrisQL is a query language for Iris Investigate that lets you express complex, multi-field searches in a single request. Pass the query as a raw string via the `irisql` parameter. The query must begin with `# IrisQL-1.0`.

```python
query = """# IrisQL-1.0
DOMAIN CONTAINS "phishing"
AND
RISK_SCORE GREATER_THAN 85
"""

results = api.iris_investigate(irisql=query)
print(results["results_count"])
for domain in results:
print(domain["domain"])
```

Pagination parameters (`page_size`, `sort_by`, `position`) are supported alongside IrisQL via `**kwargs`:

```python
results = api.iris_investigate(irisql=query, page_size=50, sort_by="risk_score", position=0)
```

When `irisql` is set, any domain or filter parameters passed alongside it are silently ignored. IrisQL uses header-based authentication (`X-Api-Key`) automatically.

Using the API Asynchronously
===================

Expand Down Expand Up @@ -201,6 +227,18 @@ Optionally, you can specify the desired format (html, xml, json, or list) of the
domaintools domain_search google --max_length 10 -u $TEST_USER -k $TEST_KEY -f html
```

IrisQL queries are supported via the `--irisql` flag on `iris_investigate`. The query must begin with `# IrisQL-1.0` on its own line:

```bash
domaintools iris_investigate --irisql $'# IrisQL-1.0\nDOMAIN CONTAINS "phishing"' -u $TEST_USER -k $TEST_KEY
```

Pagination parameters can be passed alongside the IrisQL query:

```bash
domaintools iris_investigate --irisql $'# IrisQL-1.0\nDOMAIN CONTAINS "phishing"' --page-size 50 --sort-by risk_score -u $TEST_USER -k $TEST_KEY
```

To avoid having to type in your API key repeatedly, you can specify them in `~/.dtapi` separated by a new line:

```bash
Expand Down Expand Up @@ -289,12 +327,11 @@ To add more e2e tests, put these in the `../tests/e2e` folder.
source venv/bin/activate
```

- Install dependencies.
- Install dependencies (with test extras):
```bash
pip install -r requirements/development.txt
pip install -e ".[test]"
```

- From the python_api project root directory, install the package.
Or without test dependencies:
```bash
pip install -e .
```
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.7.4
2.8.0
2 changes: 1 addition & 1 deletion domaintools/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@

"""

current = "2.7.4"
current = "2.8.0"
74 changes: 72 additions & 2 deletions domaintools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,57 @@ def domain_profile(self, query, **kwargs):
"""Returns a profile for the specified domain name"""
return self._results("domain-profile", "/v1/{0}".format(query))

def domain_history(
self,
query,
include_fields=None,
exclude_fields=None,
page_size=None,
offset=None,
next=None,
parsed_whois=None,
parsed_domain_rdap=None,
**kwargs,
):
"""Returns the history of changes for a given domain name.

Results are returned in reverse chronological order. Each change event includes
a timestamp, the field that changed, and the complete before/after domain state.

Args:
query: The apex domain name to retrieve history for (e.g. "domaintools.com").
include_fields: Comma-separated list of exact field names. Only change events
matching these fields appear in results. Cannot be combined with
exclude_fields. Supports aggregate prefixes (e.g. "all_ssl", "all_ip").
Example: "ip,registrar,all_ssl"
exclude_fields: Comma-separated list of exact field names. Change events
matching these fields are omitted. Cannot be combined with include_fields.
Example: "all_web_trackers,all_ssl"
page_size: Number of change events per page. Maximum and default is 100.
offset: 0-indexed starting point for pagination. Increment by page_size for
each subsequent page.
next: When True, includes a next URL in the response for cursor-based
pagination. Auth parameters must still be included when following it.
parsed_whois: When True, includes the full parsed WHOIS record in the
before/after objects of each change event.
parsed_domain_rdap: When True, includes the full parsed Domain RDAP record
in the before/after objects of each change event.
"""
return self._results(
"domain-history",
"/v1/domain-history",
domain=query,
include_fields=include_fields,
exclude_fields=exclude_fields,
page_size=page_size,
offset=offset,
next=next,
parsed_whois=parsed_whois,
parsed_domain_rdap=parsed_domain_rdap,
items_path=("changes",),
**kwargs,
)

def domain_search(
self,
query,
Expand Down Expand Up @@ -662,13 +713,14 @@ def iris_investigate(
updated_after=None,
include_domains_with_missing_field=None,
exclude_domains_with_missing_field=None,
irisql=None,
**kwargs,
):
"""Returns back a list of domains based on the provided filters.

You can loop over results of your investigation as if it was a native Python list:

for result in api.iris_investigate(ip='199.30.228.112'): # Enables looping over all related results
for result in api.iris_investigate(ip='199.30.228.112'):

api.iris_investigate(QUERY)['results_count'] Returns the number of results returned with this request
api.iris_investigate(QUERY)['total_count'] Returns the number of results available within Iris
Expand All @@ -677,9 +729,27 @@ def iris_investigate(
api.iris_investigate(QUERY)['position'] Returns the position key that can be used to retrieve the next page:
next_page = api.iris_investigate(QUERY, position=api.iris_investigate(QUERY)['position'])

for enrichment in api.iris_enrich(i): # Enables looping over all returned enriched domains
IrisQL mode (mutually exclusive with all other search parameters):

irisql: str: A raw IrisQL query string. Must begin with '# IrisQL-1.0'.
Sent as a raw POST body (text/plain). When set, all domain/filter params are ignored.
Pagination params (page_size, sort_by, position) are still supported via **kwargs.

Example:
api.iris_investigate(irisql='# IrisQL-1.0\\nDOMAIN CONTAINS "phishing"', page_size=50, sort_by='risk_score')

"""
if irisql is not None:
if domains:
print("Warning: irisql is set — ignoring 'domains' and other search parameters. IrisQL query takes precedence.")
return self._results(
"iris-investigate",
"/v1/iris-investigate/",
items_path=("results",),
irisql=irisql,
**kwargs,
)

# We put search_hash in the signature definition so the CLI can see it as a valid arg
if search_hash:
kwargs["search_hash"] = search_hash
Expand Down
30 changes: 24 additions & 6 deletions domaintools/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
RequestUriTooLongException,
)


try: # pragma: no cover
from collections.abc import MutableMapping, MutableSequence
except ImportError: # pragma: no cover
Expand Down Expand Up @@ -108,6 +107,19 @@ def _make_request(self):
"iris-enrich",
"iris-detect-escalate-domains",
]:
if self.product == "iris-investigate" and "irisql" in self.kwargs:
irisql_query = self.kwargs["irisql"]
auth_keys = {"api_username", "timestamp", "signature", "api_key"}
query_params = {
k: v for k, v in self.kwargs.items() if k != "irisql" and k not in auth_keys
}
query_params.update(self.api.extra_request_params)
return session.post(
url=self.url,
content=irisql_query,
params=query_params,
headers={**headers, "Content-Type": "text/plain", "X-Api-Key": self.api.key},
)
post_data = self.kwargs.copy()
post_data.update(self.api.extra_request_params)
return session.post(url=self.url, data=post_data, headers=headers)
Expand Down Expand Up @@ -153,21 +165,27 @@ def data(self):
self._data = results.json()
else:
self._data = results.text

self.check_limit_exceeded()

return self._data

def check_limit_exceeded(self):
limit_exceeded, reason = False, ""

if isinstance(self._data, dict) and (
"response" in self._data
and "limit_exceeded" in self._data["response"]
and self._data["response"]["limit_exceeded"] is True
):
limit_exceeded, reason = True, self._data["response"]["message"]
elif "response" in self._data and "limit_exceeded" in self._data:
limit_exceeded = True
# check for xml format, and return the actual error message
if self.kwargs.get("format") == "xml" and isinstance(self._data, str):
if re.search(r"<limit_exceeded>1</limit_exceeded>", self._data):
msg = re.search(r"<message>(.*?)</message>", self._data)
limit_exceeded, reason = True, msg.group(1) if msg else ""
else:
limit_exceeded = True

if limit_exceeded:
raise ServiceException(503, f"Limit Exceeded {reason}")
Expand Down Expand Up @@ -277,6 +295,8 @@ def __exit__(self, *args):

@property
def json(self):
if self._data is not None:
return self
self.kwargs.pop("format", None)
return self.__class__(
format="json",
Expand Down Expand Up @@ -341,9 +361,7 @@ def html(self):
)

def as_list(self):
return "\n".join(
[json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()]
)
return "\n".join([json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()])

def __str__(self):
return str(
Expand Down
27 changes: 10 additions & 17 deletions domaintools/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,28 +230,21 @@ def run(cls, name: str, params: Optional[Dict] = {}, **kwargs):
description=f"Preparing results with format of {response_format}...",
)

if name not in ("available_api_calls",) and not getattr(response, "product", None) in RTTF_PRODUCTS_LIST:
response.data()

output = cls._get_formatted_output(
cmd_name=name, response=response, out_format=response_format
)

if isinstance(out_file, _io.TextIOWrapper):
progress.update(
task_id,
description=f"Printing the results with format of {response_format}...",
)
# use rich `print` command to prettify the ouput in sys.stdout
if name not in ("available_api_calls",) and response.product in RTTF_PRODUCTS_LIST:
for feeds in response.response():
print(feeds)
else:
print(response)
if isinstance(out_file, _io.TextIOWrapper):
if name not in ("available_api_calls",) and response.product in RTTF_PRODUCTS_LIST:
for feeds in response.response():
print(feeds)
else:
progress.update(
task_id,
description=f"Writing results to {out_file}",
)
# if it's a file then write
out_file.write(output if output.endswith("\n") else output + "\n")
print(output)
else:
out_file.write(output if output.endswith("\n") else output + "\n")
except Exception as e:
if isinstance(e, ServiceException):
code = typer.style(getattr(e, "code", 400), fg=typer.colors.BRIGHT_RED)
Expand Down
46 changes: 46 additions & 0 deletions domaintools/cli/commands/domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,51 @@ def domain_profile(
DTCLICommand.run(name=c.DOMAIN_PROFILE, params=ctx.params)


@dt_cli.command(
name=c.DOMAIN_HISTORY,
help=get_cli_helptext_by_name(command_name=c.DOMAIN_HISTORY),
)
def domain_history(
ctx: typer.Context,
query: str = typer.Option(..., "-q", "--query", help="The apex domain name to retrieve history for (e.g. domaintools.com)."),
include_fields: str = typer.Option(None, "--include-fields", help="Comma-separated list of exact field names. Only change events matching these fields appear in results. Cannot be combined with --exclude-fields. Example: ip,registrar,all_ssl"),
exclude_fields: str = typer.Option(None, "--exclude-fields", help="Comma-separated list of exact field names. Change events matching these fields are omitted. Cannot be combined with --include-fields. Example: all_web_trackers,all_ssl"),
page_size: int = typer.Option(None, "--page-size", help="Number of change events per page. Maximum is 100 (default: 100)."),
offset: int = typer.Option(None, "--offset", help="0-indexed starting point for pagination. Increment by page-size for each subsequent page."),
next: bool = typer.Option(None, "--next", help="When true, includes a next URL in the response for cursor-based pagination."),
parsed_whois: bool = typer.Option(None, "--parsed-whois", help="When true, includes the full parsed WHOIS record in the before/after objects of each change event."),
parsed_domain_rdap: bool = typer.Option(None, "--parsed-domain-rdap", help="When true, includes the full parsed Domain RDAP record in the before/after objects of each change event."),
user: str = typer.Option(None, "-u", "--user", help="Domaintools API Username."),
key: str = typer.Option(None, "-k", "--key", help="DomainTools API key"),
creds_file: str = typer.Option(
"~/.dtapi",
"-c",
"--credfile",
help="Optional file with API username and API key, one per line.",
),
rate_limit: bool = typer.Option(
False,
"-l",
"--rate-limit",
help="Rate limit API calls against the API based on per minute limits.",
),
format: str = typer.Option(
"json",
"-f",
"--format",
help="Output format in {'list', 'json', 'xml', 'html'}",
callback=DTCLICommand.validate_format_input,
),
out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"),
no_verify_ssl: bool = typer.Option(
False,
"--no-verify-ssl",
help="Skip verification of SSL certificate when making HTTPs API calls",
),
):
DTCLICommand.run(name=c.DOMAIN_HISTORY, params=ctx.params)


@dt_cli.command(
name=c.DOMAIN_SEARCH,
help=get_cli_helptext_by_name(command_name=c.DOMAIN_SEARCH),
Expand Down Expand Up @@ -666,6 +711,7 @@ def risk_evidence(

__all__ = [
"brand_monitor",
"domain_history",
"domain_profile",
"domain_search",
"name_server_monitor",
Expand Down
5 changes: 5 additions & 0 deletions domaintools/cli/commands/iris.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def iris_investigate(
create_date: str = typer.Option(None, "--create-date", help="The create date."),
active: bool = typer.Option(None, "--active", help="The domains that are in active state"),
search_hash: str = typer.Option(None, "--search-hash", help="The search hash to use"),
irisql: str = typer.Option(
None,
"--irisql",
help="IrisQL query string (must begin with '# IrisQL-1.0'). Mutually exclusive with domain/filter params. Pagination kwargs (--page-size, --sort-by, --position) are still supported.",
),
src_file: str = typer.Option(
None,
"-s",
Expand Down
1 change: 1 addition & 0 deletions domaintools/cli/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

# domains
BRAND_MONITOR = "brand_monitor"
DOMAIN_HISTORY = "domain_history"
DOMAIN_PROFILE = "domain_profile"
DOMAIN_SEARCH = "domain_search"
HOSTING_HISTORY = "hosting_history"
Expand Down
1 change: 1 addition & 0 deletions domaintools/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def _iris_investigate_helptext():
c.IRIS_INVESTIGATE: _iris_investigate_helptext(),
c.IRIS_ENRICH: "Returns back enriched data related to the specified domains using our Iris Enrich service.",
c.BRAND_MONITOR: "Pass in one or more terms as a list or separated by the pipe character ( | )",
c.DOMAIN_HISTORY: "Returns the history of changes for a given domain name.",
c.DOMAIN_PROFILE: "Returns a profile for the specified domain name",
c.DOMAIN_SEARCH: """Each term in the query string must be at least three characters long. Pass in a list or use spaces to separate multiple terms.""",
c.HOSTING_HISTORY: "Returns the hosting history from the given domain name.",
Expand Down
Loading
Loading