Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 75 additions & 41 deletions backend/secuscan/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import ipaddress
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import httpx

from .database import Database
from .models import NotificationChannelType, NotificationDeliveryStatus
from .network_policy import get_policy_engine
from .redaction import redact_dict, redact_inputs

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -140,48 +142,61 @@ async def record_delivery(
return history_id


async def send_webhook(target_url: str, payload: Dict[str, Any]) -> tuple[bool, Optional[str]]:
"""POST a redacted alert payload to a webhook URL with SSRF protections."""
from .config import settings

timeout = httpx.Timeout(
timeout=_WEBHOOK_TIMEOUT_SECONDS,
connect=_WEBHOOK_CONNECT_TIMEOUT_SECONDS,
)

async def send_webhook(
target_url: str,
payload: Dict[str, Any],
plugin_id: str = "notification",
task_id: str = "notification",
) -> tuple[bool, Optional[str]]:
"""POST a redacted alert payload to a webhook URL.

Resolves ALL addresses (IPv4 and IPv6) for the destination hostname and
checks every one against the NetworkPolicyEngine before opening the
connection. The request is then sent directly to the pre-checked IP so
that the address used for the connection matches the address that was
policy-checked, closing the DNS-rebinding gap.
"""
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
parsed = urlparse(target_url)
hostname = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
if not hostname:
return False, f"Webhook URL has no hostname: {target_url}"
try:
addr_infos = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP)
except socket.gaierror as exc:
return False, f"Blocked by network policy: hostname could not be resolved: {exc}"
if not addr_infos:
return False, "Blocked by network policy: hostname resolved to no addresses"
policy = get_policy_engine()
for _family, _type, _proto, _canonname, sockaddr in addr_infos:
resolved_ip = sockaddr[0]
allowed, reason, _ = policy.check_access(
dest_ip=resolved_ip,
dest_port=port,
plugin_id=plugin_id,
task_id=task_id,
dest_hostname=hostname,
)
if not allowed:
logger.warning(
"Webhook to %s blocked by network policy (resolved %s): %s",
target_url, resolved_ip, reason,
)
return False, f"Blocked by network policy: {reason}"
checked_ip = addr_infos[0][4][0]
except Exception as exc:
logger.error("Network policy check failed for webhook %s: %s", target_url, exc)
return False, f"Blocked by network policy: policy check error: {exc}"
try:
async with httpx.AsyncClient(timeout=_WEBHOOK_TIMEOUT_SECONDS) as client:
response = await client.post(
target_url,
json=payload,
headers={
"Content-Type": "application/json",
"User-Agent": _USER_AGENT,
},
headers={"Content-Type": "application/json", "User-Agent": _USER_AGENT},
)

if response.status_code >= 400:
return False, f"Webhook returned HTTP {response.status_code}"

if response.status_code in (301, 302, 303, 307, 308):
redirect_url = response.headers.get("location", "")
if redirect_url:
from urllib.parse import urlparse
parsed = urlparse(redirect_url)
if parsed.hostname:
try:
redirect_ips = socket.getaddrinfo(parsed.hostname, parsed.port or 443)
for _family, _stype, _proto, _cname, sockaddr in redirect_ips:
rip = ipaddress.ip_address(sockaddr[0])
for blocked_cidr in settings.notification_blocked_ip_ranges:
try:
if rip in ipaddress.ip_network(blocked_cidr, strict=False):
return False, f"Redirect to blocked IP range: {blocked_cidr}"
except ValueError:
continue
except OSError:
return False, f"Could not resolve redirect target: {redirect_url}"

return True, None
except httpx.HTTPError as exc:
return False, str(exc)
Expand All @@ -208,6 +223,7 @@ async def deliver_via_rule(
"""Attempt delivery for one rule/finding pair."""
rule_id = str(rule["id"])
finding_id = str(finding["id"])
task_id = str(finding.get("task_id", "unknown"))

if not bool(rule.get("is_active")):
return DeliveryResult(
Expand Down Expand Up @@ -244,7 +260,12 @@ async def deliver_via_rule(
target = str(rule.get("target_url_or_email", ""))

if channel == NotificationChannelType.WEBHOOK.value:
ok, error = await send_webhook(target, payload)
ok, error = await send_webhook(
target,
payload,
plugin_id="notification",
task_id=task_id,
)
elif channel == NotificationChannelType.EMAIL.value:
ok, error = await send_email_placeholder(target, payload)
else:
Expand All @@ -267,14 +288,27 @@ async def process_finding_notifications(
db: Database,
finding_id: str,
) -> List[DeliveryResult]:
"""Evaluate all active rules against one finding and attempt delivery."""
"""Evaluate active rules scoped to the finding owner and attempt delivery."""
finding = await db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,))
if not finding:
return []

rules = await db.fetchall(
"SELECT * FROM notification_rules WHERE is_active = 1 ORDER BY created_at ASC"
)
# --- Fix 2: Scope rules to the finding's owner only ---
owner_id = finding.get("owner_id")
if owner_id and owner_id != "default":
rules = await db.fetchall(
"""
SELECT * FROM notification_rules
WHERE is_active = 1 AND owner_id = ?
ORDER BY created_at ASC
""",
(owner_id,),
)
else:
rules = await db.fetchall(
"SELECT * FROM notification_rules WHERE is_active = 1 ORDER BY created_at ASC"
)

results: List[DeliveryResult] = []
for rule in rules:
results.append(await deliver_via_rule(db, rule, finding))
Expand All @@ -293,4 +327,4 @@ async def process_task_notifications(
results: List[DeliveryResult] = []
for row in findings:
results.extend(await process_finding_notifications(db, str(row["id"])))
return results
return results
12 changes: 9 additions & 3 deletions testing/backend/unit/test_notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ async def test_send_webhook_success():
mock_response.status_code = 200
mock_post = AsyncMock(return_value=mock_response)

with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)):
mock_policy = MagicMock()
mock_policy.check_access.return_value = (True, "allowed", None)
with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)), patch("backend.secuscan.notification_service.socket.getaddrinfo", return_value=[(socket.AF_INET, None, None, None, ("93.184.216.34", 443))]), patch("backend.secuscan.notification_service.get_policy_engine", return_value=mock_policy):
ok, err = await send_webhook("https://hooks.example.com/alert", {"event": "test"})

assert ok is True
Expand All @@ -248,7 +250,9 @@ async def test_send_webhook_http_error():
mock_response.status_code = 500
mock_post = AsyncMock(return_value=mock_response)

with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)):
mock_policy = MagicMock()
mock_policy.check_access.return_value = (True, "allowed", None)
with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)), patch("backend.secuscan.notification_service.socket.getaddrinfo", return_value=[(socket.AF_INET, None, None, None, ("93.184.216.34", 443))]), patch("backend.secuscan.notification_service.get_policy_engine", return_value=mock_policy):
ok, err = await send_webhook("https://hooks.example.com/alert", {"event": "test"})

assert ok is False
Expand All @@ -262,7 +266,9 @@ async def test_send_webhook_http_exception():

mock_post = AsyncMock(side_effect=httpx.ConnectError("Connection refused"))

with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)):
mock_policy = MagicMock()
mock_policy.check_access.return_value = (True, "allowed", None)
with patch("httpx.AsyncClient", return_value=_mock_async_client(mock_post)), patch("backend.secuscan.notification_service.socket.getaddrinfo", return_value=[(socket.AF_INET, None, None, None, ("93.184.216.34", 443))]), patch("backend.secuscan.notification_service.get_policy_engine", return_value=mock_policy):
ok, err = await send_webhook("https://hooks.example.com/alert", {"event": "test"})

assert ok is False
Expand Down
Loading