diff --git a/servicenow-cmdb-export/.env.example b/servicenow-cmdb-export/.env.example new file mode 100644 index 0000000..eaba9c3 --- /dev/null +++ b/servicenow-cmdb-export/.env.example @@ -0,0 +1,3 @@ +SNOW_INSTANCE=acme.service-now.com +SNOW_USER=cmdb_reader +SNOW_PASSWORD=changeme diff --git a/servicenow-cmdb-export/.gitignore b/servicenow-cmdb-export/.gitignore new file mode 100644 index 0000000..6009ba4 --- /dev/null +++ b/servicenow-cmdb-export/.gitignore @@ -0,0 +1,9 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ +venv/ +.env +*.xlsx +!sample.xlsx +.pytest_cache/ diff --git a/servicenow-cmdb-export/README.md b/servicenow-cmdb-export/README.md new file mode 100644 index 0000000..9861dd0 --- /dev/null +++ b/servicenow-cmdb-export/README.md @@ -0,0 +1,55 @@ +# servicenow-cmdb-export + +CLI that pulls CMDB reference records from ServiceNow and writes a denormalized +device-to-business-app-and-environment mapping to an Excel file. Servers and +client devices in one sheet, joined to apps via `cmdb_rel_ci` (with an +assigned-user fallback for clients). + +## Install + +```bash +pip install -e . +``` + +## Configure + +Set environment variables (see `.env.example`): + +- `SNOW_INSTANCE` — e.g. `acme.service-now.com` +- `SNOW_USER` / `SNOW_PASSWORD` — basic-auth credentials with read on + `cmdb_ci_server`, `cmdb_ci_pc_hardware`, `cmdb_ci_business_app`, + `cmdb_ci_environment`, `cmdb_rel_ci`, `sys_user`. + +## Run + +```bash +snow-cmdb-export server-app-map --out cmdb.xlsx +snow-cmdb-export server-app-map --out servers-only.xlsx --device-type server +snow-cmdb-export server-app-map --out filtered.xlsx \ + --query "operational_status=1^company.name=Acme" +snow-cmdb-export server-app-map --out audit.xlsx --raw +``` + +## Output schema (`device_app_map` sheet) + +| column | source | +| --- | --- | +| device_type | `server` or `client` | +| name | `cmdb_ci_*.name` | +| fqdn | `cmdb_ci_*.fqdn` (falls back to `dns_domain`) | +| environment | `cmdb_ci_environment.name` joined via `cmdb_rel_ci` | +| assigned_user | `sys_user.name` resolved from `assigned_to` | +| assigned_user_email | `sys_user.email` | +| business_app | `cmdb_ci_business_app.name` | +| app_owner | `business_owner` (fallback `managed_by`) | +| criticality | `business_criticality` | +| support_group | `support_group` on the CI | + +## Joining logic + +- **Servers → apps**: relationship rows on `cmdb_rel_ci` whose `type` display + value is one of `Runs on::Runs`, `Depends on::Used by`, `Hosted on::Hosts`. +- **Clients → apps**: same relationship lookup first; if none, fall back to + business apps where the client's assigned user is `business_owner`, + `managed_by`, or `owned_by`. Capped by `--max-apps-per-user` (default 10). +- **Environment**: any `cmdb_rel_ci` edge from the CI to a `cmdb_ci_environment`. diff --git a/servicenow-cmdb-export/pyproject.toml b/servicenow-cmdb-export/pyproject.toml new file mode 100644 index 0000000..2be4575 --- /dev/null +++ b/servicenow-cmdb-export/pyproject.toml @@ -0,0 +1,20 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "servicenow-cmdb-export" +version = "0.1.0" +description = "Export CMDB server/client to business-app and environment mappings from ServiceNow into Excel." +requires-python = ">=3.10" +dependencies = [ + "requests>=2.31", + "openpyxl>=3.1", + "click>=8.1", +] + +[project.scripts] +snow-cmdb-export = "servicenow_cmdb_export.cli:main" + +[tool.setuptools.packages.find] +include = ["servicenow_cmdb_export*"] diff --git a/servicenow-cmdb-export/requirements.txt b/servicenow-cmdb-export/requirements.txt new file mode 100644 index 0000000..aa58ab2 --- /dev/null +++ b/servicenow-cmdb-export/requirements.txt @@ -0,0 +1,3 @@ +requests>=2.31 +openpyxl>=3.1 +click>=8.1 diff --git a/servicenow-cmdb-export/servicenow_cmdb_export/__init__.py b/servicenow-cmdb-export/servicenow_cmdb_export/__init__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/servicenow-cmdb-export/servicenow_cmdb_export/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/servicenow-cmdb-export/servicenow_cmdb_export/__main__.py b/servicenow-cmdb-export/servicenow_cmdb_export/__main__.py new file mode 100644 index 0000000..62db75e --- /dev/null +++ b/servicenow-cmdb-export/servicenow_cmdb_export/__main__.py @@ -0,0 +1,4 @@ +from servicenow_cmdb_export.cli import main + +if __name__ == "__main__": + main() diff --git a/servicenow-cmdb-export/servicenow_cmdb_export/cli.py b/servicenow-cmdb-export/servicenow_cmdb_export/cli.py new file mode 100644 index 0000000..b2dfe73 --- /dev/null +++ b/servicenow-cmdb-export/servicenow_cmdb_export/cli.py @@ -0,0 +1,74 @@ +"""CLI: snow-cmdb-export.""" +from __future__ import annotations + +import sys + +import click + +from servicenow_cmdb_export.client import ServiceNowClient, ServiceNowError +from servicenow_cmdb_export.exporter import write_workbook +from servicenow_cmdb_export.queries import build_rows, fetch_snapshot + + +@click.group() +@click.version_option() +def main() -> None: + """Export CMDB device-to-app-and-environment mappings from ServiceNow.""" + + +@main.command("server-app-map") +@click.option( + "--out", + "-o", + required=True, + type=click.Path(dir_okay=False, writable=True), + help="Output .xlsx path.", +) +@click.option( + "--device-type", + type=click.Choice(["all", "server", "client"]), + default="all", + show_default=True, +) +@click.option( + "--query", + default=None, + help="Optional ServiceNow encoded query applied to device tables (sysparm_query).", +) +@click.option( + "--max-apps-per-user", + type=int, + default=10, + show_default=True, + help="Cap apps per client (via assigned-user fallback). 0 = unlimited.", +) +@click.option( + "--raw/--no-raw", + default=False, + help="Also dump raw CMDB tables as additional sheets.", +) +def server_app_map( + out: str, device_type: str, query: str | None, max_apps_per_user: int, raw: bool +) -> None: + """Pull CMDB and write a denormalized device/app/env mapping to Excel.""" + try: + client = ServiceNowClient() + except ServiceNowError as e: + click.echo(f"error: {e}", err=True) + sys.exit(2) + + click.echo("Fetching CMDB snapshot from ServiceNow...", err=True) + snap = fetch_snapshot(client, device_filter=device_type, extra_query=query) + click.echo( + f" servers={len(snap.servers)} clients={len(snap.clients)} " + f"apps={len(snap.apps_by_id)} envs={len(snap.envs_by_id)}", + err=True, + ) + + rows = build_rows(snap, max_apps_per_user=max_apps_per_user) + n = write_workbook(out, rows, snapshot=snap, include_raw=raw) + click.echo(f"Wrote {n} rows to {out}", err=True) + + +if __name__ == "__main__": + main() diff --git a/servicenow-cmdb-export/servicenow_cmdb_export/client.py b/servicenow-cmdb-export/servicenow_cmdb_export/client.py new file mode 100644 index 0000000..2596809 --- /dev/null +++ b/servicenow-cmdb-export/servicenow_cmdb_export/client.py @@ -0,0 +1,94 @@ +"""Thin ServiceNow Table API client. + +Auth: HTTP Basic via env vars (SNOW_USER / SNOW_PASSWORD). Instance from +SNOW_INSTANCE (e.g. "acme.service-now.com" or full https URL). +""" +from __future__ import annotations + +import os +from typing import Iterable, Iterator +from urllib.parse import urljoin + +import requests + + +class ServiceNowError(RuntimeError): + pass + + +class ServiceNowClient: + DEFAULT_PAGE_SIZE = 1000 + + def __init__( + self, + instance: str | None = None, + user: str | None = None, + password: str | None = None, + timeout: int = 60, + ) -> None: + instance = instance or os.environ.get("SNOW_INSTANCE") + user = user or os.environ.get("SNOW_USER") + password = password or os.environ.get("SNOW_PASSWORD") + if not instance: + raise ServiceNowError("SNOW_INSTANCE not set") + if not user or not password: + raise ServiceNowError("SNOW_USER and SNOW_PASSWORD must be set") + + if not instance.startswith("http"): + instance = f"https://{instance}" + self.base_url = instance.rstrip("/") + "/" + self.timeout = timeout + self._session = requests.Session() + self._session.auth = (user, password) + self._session.headers.update( + {"Accept": "application/json", "Content-Type": "application/json"} + ) + + def table( + self, + name: str, + query: str | None = None, + fields: Iterable[str] | None = None, + page_size: int = DEFAULT_PAGE_SIZE, + ) -> Iterator[dict]: + """Yield rows from a ServiceNow table, paging through results.""" + url = urljoin(self.base_url, f"api/now/table/{name}") + params: dict[str, str] = { + "sysparm_limit": str(page_size), + "sysparm_display_value": "all", + "sysparm_exclude_reference_link": "true", + } + if query: + params["sysparm_query"] = query + if fields: + params["sysparm_fields"] = ",".join(fields) + + offset = 0 + while True: + params["sysparm_offset"] = str(offset) + resp = self._session.get(url, params=params, timeout=self.timeout) + if resp.status_code >= 400: + raise ServiceNowError( + f"GET {name} failed: {resp.status_code} {resp.text[:300]}" + ) + batch = resp.json().get("result", []) + if not batch: + return + for row in batch: + yield row + if len(batch) < page_size: + return + offset += page_size + + @staticmethod + def display(field) -> str: + """Pull display_value from a ServiceNow field that may be a dict or scalar.""" + if isinstance(field, dict): + return field.get("display_value") or field.get("value") or "" + return field or "" + + @staticmethod + def value(field) -> str: + if isinstance(field, dict): + return field.get("value") or "" + return field or "" diff --git a/servicenow-cmdb-export/servicenow_cmdb_export/exporter.py b/servicenow-cmdb-export/servicenow_cmdb_export/exporter.py new file mode 100644 index 0000000..a54ad84 --- /dev/null +++ b/servicenow-cmdb-export/servicenow_cmdb_export/exporter.py @@ -0,0 +1,79 @@ +"""Excel writer for CMDB export rows.""" +from __future__ import annotations + +from pathlib import Path +from typing import Iterable + +from openpyxl import Workbook +from openpyxl.styles import Font +from openpyxl.utils import get_column_letter + +from servicenow_cmdb_export.queries import CMDBSnapshot, Row + + +def write_workbook( + path: str | Path, + rows: Iterable[Row], + snapshot: CMDBSnapshot | None = None, + include_raw: bool = False, +) -> int: + wb = Workbook() + ws = wb.active + ws.title = "device_app_map" + + headers = Row.headers() + ws.append(headers) + for cell in ws[1]: + cell.font = Font(bold=True) + + count = 0 + for row in rows: + ws.append(row.as_list()) + count += 1 + + _autosize(ws, len(headers)) + ws.freeze_panes = "A2" + + if include_raw and snapshot is not None: + _add_raw_sheet(wb, "raw_servers", snapshot.servers) + _add_raw_sheet(wb, "raw_clients", snapshot.clients) + _add_raw_sheet(wb, "raw_apps", list(snapshot.apps_by_id.values())) + _add_raw_sheet(wb, "raw_envs", list(snapshot.envs_by_id.values())) + + wb.save(path) + return count + + +def _autosize(ws, col_count: int, max_width: int = 50) -> None: + for idx in range(1, col_count + 1): + letter = get_column_letter(idx) + longest = 0 + for cell in ws[letter]: + v = cell.value + if v is None: + continue + longest = max(longest, len(str(v))) + ws.column_dimensions[letter].width = min(longest + 2, max_width) + + +def _add_raw_sheet(wb: Workbook, title: str, records: list[dict]) -> None: + ws = wb.create_sheet(title=title) + if not records: + ws.append(["(no records)"]) + return + keys = sorted({k for r in records for k in r.keys()}) + ws.append(keys) + for cell in ws[1]: + cell.font = Font(bold=True) + for r in records: + ws.append([_flatten(r.get(k)) for k in keys]) + _autosize(ws, len(keys)) + ws.freeze_panes = "A2" + + +def _flatten(field) -> str: + if isinstance(field, dict): + return field.get("display_value") or field.get("value") or "" + if field is None: + return "" + return str(field) diff --git a/servicenow-cmdb-export/servicenow_cmdb_export/queries.py b/servicenow-cmdb-export/servicenow_cmdb_export/queries.py new file mode 100644 index 0000000..47ed9fc --- /dev/null +++ b/servicenow-cmdb-export/servicenow_cmdb_export/queries.py @@ -0,0 +1,223 @@ +"""CMDB extraction and joining. + +Strategy: +- Pull servers (cmdb_ci_server) and clients (cmdb_ci_pc_hardware) separately so + device_type is unambiguous. +- Pull cmdb_ci_business_app and cmdb_ci_environment. +- Pull cmdb_rel_ci once, indexed by parent and child sys_id, so we can resolve: + server -> business_app (via "Runs on::Runs" / "Depends on::Used by") + server -> environment (via any rel where the other side is an environment CI) + client -> environment (same) +- For clients without a CMDB-modeled app link, fall back to assigned_to user -> + apps where that user is business_owner / managed_by / owned_by. +""" +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Iterable + +from servicenow_cmdb_export.client import ServiceNowClient + +SERVER_TABLE = "cmdb_ci_server" +CLIENT_TABLE = "cmdb_ci_pc_hardware" +APP_TABLE = "cmdb_ci_business_app" +ENV_TABLE = "cmdb_ci_environment" +REL_TABLE = "cmdb_rel_ci" +USER_TABLE = "sys_user" + +# Relationship type display values that connect a CI to its hosted app. +APP_REL_TYPES = {"Runs on::Runs", "Depends on::Used by", "Hosted on::Hosts"} + + +@dataclass +class Row: + device_type: str + name: str + fqdn: str + environment: str + assigned_user: str + assigned_user_email: str + business_app: str + app_owner: str + criticality: str + support_group: str + + @staticmethod + def headers() -> list[str]: + return [ + "device_type", + "name", + "fqdn", + "environment", + "assigned_user", + "assigned_user_email", + "business_app", + "app_owner", + "criticality", + "support_group", + ] + + def as_list(self) -> list[str]: + return [getattr(self, h) for h in self.headers()] + + +@dataclass +class CMDBSnapshot: + servers: list[dict] = field(default_factory=list) + clients: list[dict] = field(default_factory=list) + apps_by_id: dict[str, dict] = field(default_factory=dict) + envs_by_id: dict[str, dict] = field(default_factory=dict) + users_by_id: dict[str, dict] = field(default_factory=dict) + # parent_sys_id -> list of (rel_type_display, child_sys_id, child_class) + rels_by_parent: dict[str, list[tuple[str, str, str]]] = field(default_factory=dict) + rels_by_child: dict[str, list[tuple[str, str, str]]] = field(default_factory=dict) + # sys_user.sys_id -> set of business_app sys_ids the user owns + apps_by_owner: dict[str, set[str]] = field(default_factory=dict) + + +def fetch_snapshot( + client: ServiceNowClient, + device_filter: str = "all", + extra_query: str | None = None, +) -> CMDBSnapshot: + snap = CMDBSnapshot() + disp = ServiceNowClient.display + val = ServiceNowClient.value + + if device_filter in ("all", "server"): + snap.servers = list(client.table(SERVER_TABLE, query=extra_query)) + if device_filter in ("all", "client"): + snap.clients = list(client.table(CLIENT_TABLE, query=extra_query)) + + for app in client.table(APP_TABLE): + snap.apps_by_id[val(app.get("sys_id"))] = app + owner_id = val(app.get("business_owner")) or val(app.get("managed_by")) or val(app.get("owned_by")) + if owner_id: + snap.apps_by_owner.setdefault(owner_id, set()).add(val(app.get("sys_id"))) + + for env in client.table(ENV_TABLE): + snap.envs_by_id[val(env.get("sys_id"))] = env + + # Collect user sys_ids referenced as assigned_to so we can resolve emails. + user_ids: set[str] = set() + for ci in (*snap.servers, *snap.clients): + uid = val(ci.get("assigned_to")) + if uid: + user_ids.add(uid) + if user_ids: + # Chunk the IN clause to avoid URL length limits. + ids = list(user_ids) + for i in range(0, len(ids), 200): + chunk = ids[i : i + 200] + q = "sys_idIN" + ",".join(chunk) + for u in client.table(USER_TABLE, query=q, fields=["sys_id", "name", "email"]): + snap.users_by_id[val(u.get("sys_id"))] = u + + for rel in client.table(REL_TABLE, fields=["parent", "child", "type"]): + parent = val(rel.get("parent")) + child = val(rel.get("child")) + rel_type = disp(rel.get("type")) + # Class hint isn't on the rel row directly — leave blank; we look up later. + snap.rels_by_parent.setdefault(parent, []).append((rel_type, child, "")) + snap.rels_by_child.setdefault(child, []).append((rel_type, parent, "")) + return snap + + +def _env_for_ci(snap: CMDBSnapshot, ci_sys_id: str) -> str: + disp = ServiceNowClient.display + for _rel_type, other_id, _ in snap.rels_by_parent.get(ci_sys_id, []): + env = snap.envs_by_id.get(other_id) + if env: + return disp(env.get("name")) + for _rel_type, other_id, _ in snap.rels_by_child.get(ci_sys_id, []): + env = snap.envs_by_id.get(other_id) + if env: + return disp(env.get("name")) + return "" + + +def _apps_for_server(snap: CMDBSnapshot, ci_sys_id: str) -> list[dict]: + apps: list[dict] = [] + seen: set[str] = set() + for rel_type, other_id, _ in snap.rels_by_parent.get(ci_sys_id, []): + if rel_type in APP_REL_TYPES and other_id in snap.apps_by_id and other_id not in seen: + apps.append(snap.apps_by_id[other_id]) + seen.add(other_id) + for rel_type, other_id, _ in snap.rels_by_child.get(ci_sys_id, []): + if rel_type in APP_REL_TYPES and other_id in snap.apps_by_id and other_id not in seen: + apps.append(snap.apps_by_id[other_id]) + seen.add(other_id) + return apps + + +def _apps_for_client( + snap: CMDBSnapshot, ci_sys_id: str, assigned_user_id: str +) -> list[dict]: + direct = _apps_for_server(snap, ci_sys_id) + if direct: + return direct + # Fallback: assigned user's owned apps. + if not assigned_user_id: + return [] + return [snap.apps_by_id[a] for a in snap.apps_by_owner.get(assigned_user_id, set())] + + +def build_rows( + snap: CMDBSnapshot, max_apps_per_user: int = 10 +) -> Iterable[Row]: + disp = ServiceNowClient.display + val = ServiceNowClient.value + + def emit(ci: dict, device_type: str): + sys_id = val(ci.get("sys_id")) + name = disp(ci.get("name")) + fqdn = disp(ci.get("fqdn")) or disp(ci.get("dns_domain")) + env = _env_for_ci(snap, sys_id) + user_id = val(ci.get("assigned_to")) + user = snap.users_by_id.get(user_id, {}) + assigned_user = disp(user.get("name")) or disp(ci.get("assigned_to")) + assigned_email = disp(user.get("email")) + support_group = disp(ci.get("support_group")) + + apps = ( + _apps_for_server(snap, sys_id) + if device_type == "server" + else _apps_for_client(snap, sys_id, user_id) + ) + if max_apps_per_user and len(apps) > max_apps_per_user: + apps = apps[:max_apps_per_user] + + if not apps: + yield Row( + device_type=device_type, + name=name, + fqdn=fqdn, + environment=env, + assigned_user=assigned_user, + assigned_user_email=assigned_email, + business_app="", + app_owner="", + criticality="", + support_group=support_group, + ) + return + + for app in apps: + yield Row( + device_type=device_type, + name=name, + fqdn=fqdn, + environment=env, + assigned_user=assigned_user, + assigned_user_email=assigned_email, + business_app=disp(app.get("name")), + app_owner=disp(app.get("business_owner")) or disp(app.get("managed_by")), + criticality=disp(app.get("business_criticality")), + support_group=support_group, + ) + + for ci in snap.servers: + yield from emit(ci, "server") + for ci in snap.clients: + yield from emit(ci, "client")