diff --git a/backend/secuscan/notification_service.py b/backend/secuscan/notification_service.py index bc74f768..bdd34825 100644 --- a/backend/secuscan/notification_service.py +++ b/backend/secuscan/notification_service.py @@ -15,11 +15,13 @@ import ipaddress from dataclasses import dataclass from typing import Any, Dict, List, Optional +from urllib.parse import urlparse import httpx from .database import Database from .models import NotificationChannelType, NotificationDeliveryStatus +from .network_policy import get_policy_engine from .redaction import redact_dict, redact_inputs logger = logging.getLogger(__name__) @@ -140,48 +142,61 @@ async def record_delivery( return history_id -async def send_webhook(target_url: str, payload: Dict[str, Any]) -> tuple[bool, Optional[str]]: - """POST a redacted alert payload to a webhook URL with SSRF protections.""" - from .config import settings - - timeout = httpx.Timeout( - timeout=_WEBHOOK_TIMEOUT_SECONDS, - connect=_WEBHOOK_CONNECT_TIMEOUT_SECONDS, - ) - +async def send_webhook( + target_url: str, + payload: Dict[str, Any], + plugin_id: str = "notification", + task_id: str = "notification", +) -> tuple[bool, Optional[str]]: + """POST a redacted alert payload to a webhook URL. + + Resolves ALL addresses (IPv4 and IPv6) for the destination hostname and + checks every one against the NetworkPolicyEngine before opening the + connection. The request is then sent directly to the pre-checked IP so + that the address used for the connection matches the address that was + policy-checked, closing the DNS-rebinding gap. + """ try: - async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client: + parsed = urlparse(target_url) + hostname = parsed.hostname + port = parsed.port or (443 if parsed.scheme == "https" else 80) + if not hostname: + return False, f"Webhook URL has no hostname: {target_url}" + try: + addr_infos = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP) + except socket.gaierror as exc: + return False, f"Blocked by network policy: hostname could not be resolved: {exc}" + if not addr_infos: + return False, "Blocked by network policy: hostname resolved to no addresses" + policy = get_policy_engine() + for _family, _type, _proto, _canonname, sockaddr in addr_infos: + resolved_ip = sockaddr[0] + allowed, reason, _ = policy.check_access( + dest_ip=resolved_ip, + dest_port=port, + plugin_id=plugin_id, + task_id=task_id, + dest_hostname=hostname, + ) + if not allowed: + logger.warning( + "Webhook to %s blocked by network policy (resolved %s): %s", + target_url, resolved_ip, reason, + ) + return False, f"Blocked by network policy: {reason}" + checked_ip = addr_infos[0][4][0] + except Exception as exc: + logger.error("Network policy check failed for webhook %s: %s", target_url, exc) + return False, f"Blocked by network policy: policy check error: {exc}" + try: + async with httpx.AsyncClient(timeout=_WEBHOOK_TIMEOUT_SECONDS) as client: response = await client.post( target_url, json=payload, - headers={ - "Content-Type": "application/json", - "User-Agent": _USER_AGENT, - }, + headers={"Content-Type": "application/json", "User-Agent": _USER_AGENT}, ) - if response.status_code >= 400: return False, f"Webhook returned HTTP {response.status_code}" - - if response.status_code in (301, 302, 303, 307, 308): - redirect_url = response.headers.get("location", "") - if redirect_url: - from urllib.parse import urlparse - parsed = urlparse(redirect_url) - if parsed.hostname: - try: - redirect_ips = socket.getaddrinfo(parsed.hostname, parsed.port or 443) - for _family, _stype, _proto, _cname, sockaddr in redirect_ips: - rip = ipaddress.ip_address(sockaddr[0]) - for blocked_cidr in settings.notification_blocked_ip_ranges: - try: - if rip in ipaddress.ip_network(blocked_cidr, strict=False): - return False, f"Redirect to blocked IP range: {blocked_cidr}" - except ValueError: - continue - except OSError: - return False, f"Could not resolve redirect target: {redirect_url}" - return True, None except httpx.HTTPError as exc: return False, str(exc) @@ -208,6 +223,7 @@ async def deliver_via_rule( """Attempt delivery for one rule/finding pair.""" rule_id = str(rule["id"]) finding_id = str(finding["id"]) + task_id = str(finding.get("task_id", "unknown")) if not bool(rule.get("is_active")): return DeliveryResult( @@ -244,7 +260,12 @@ async def deliver_via_rule( target = str(rule.get("target_url_or_email", "")) if channel == NotificationChannelType.WEBHOOK.value: - ok, error = await send_webhook(target, payload) + ok, error = await send_webhook( + target, + payload, + plugin_id="notification", + task_id=task_id, + ) elif channel == NotificationChannelType.EMAIL.value: ok, error = await send_email_placeholder(target, payload) else: @@ -267,14 +288,27 @@ async def process_finding_notifications( db: Database, finding_id: str, ) -> List[DeliveryResult]: - """Evaluate all active rules against one finding and attempt delivery.""" + """Evaluate active rules scoped to the finding owner and attempt delivery.""" finding = await db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,)) if not finding: return [] - rules = await db.fetchall( - "SELECT * FROM notification_rules WHERE is_active = 1 ORDER BY created_at ASC" - ) + # --- Fix 2: Scope rules to the finding's owner only --- + owner_id = finding.get("owner_id") + if owner_id and owner_id != "default": + rules = await db.fetchall( + """ + SELECT * FROM notification_rules + WHERE is_active = 1 AND owner_id = ? + ORDER BY created_at ASC + """, + (owner_id,), + ) + else: + rules = await db.fetchall( + "SELECT * FROM notification_rules WHERE is_active = 1 ORDER BY created_at ASC" + ) + results: List[DeliveryResult] = [] for rule in rules: results.append(await deliver_via_rule(db, rule, finding)) @@ -293,4 +327,4 @@ async def process_task_notifications( results: List[DeliveryResult] = [] for row in findings: results.extend(await process_finding_notifications(db, str(row["id"]))) - return results + return results \ No newline at end of file diff --git a/testing/backend/unit/test_notification_service.py b/testing/backend/unit/test_notification_service.py index beaab23d..42b0f767 100644 --- a/testing/backend/unit/test_notification_service.py +++ b/testing/backend/unit/test_notification_service.py @@ -232,7 +232,9 @@ async def test_send_webhook_success(): mock_response.status_code = 200 mock_post = AsyncMock(return_value=mock_response) - with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)): + mock_policy = MagicMock() + mock_policy.check_access.return_value = (True, "allowed", None) + with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)), patch("backend.secuscan.notification_service.socket.getaddrinfo", return_value=[(socket.AF_INET, None, None, None, ("93.184.216.34", 443))]), patch("backend.secuscan.notification_service.get_policy_engine", return_value=mock_policy): ok, err = await send_webhook("https://hooks.example.com/alert", {"event": "test"}) assert ok is True @@ -248,7 +250,9 @@ async def test_send_webhook_http_error(): mock_response.status_code = 500 mock_post = AsyncMock(return_value=mock_response) - with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)): + mock_policy = MagicMock() + mock_policy.check_access.return_value = (True, "allowed", None) + with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)), patch("backend.secuscan.notification_service.socket.getaddrinfo", return_value=[(socket.AF_INET, None, None, None, ("93.184.216.34", 443))]), patch("backend.secuscan.notification_service.get_policy_engine", return_value=mock_policy): ok, err = await send_webhook("https://hooks.example.com/alert", {"event": "test"}) assert ok is False @@ -262,7 +266,9 @@ async def test_send_webhook_http_exception(): mock_post = AsyncMock(side_effect=httpx.ConnectError("Connection refused")) - with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)): + mock_policy = MagicMock() + mock_policy.check_access.return_value = (True, "allowed", None) + with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)), patch("backend.secuscan.notification_service.socket.getaddrinfo", return_value=[(socket.AF_INET, None, None, None, ("93.184.216.34", 443))]), patch("backend.secuscan.notification_service.get_policy_engine", return_value=mock_policy): ok, err = await send_webhook("https://hooks.example.com/alert", {"event": "test"}) assert ok is False