Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions packages/deepctl-cmd-mcp/src/deepctl_cmd_mcp/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
from typing import TYPE_CHECKING, Any

from deepctl_core import AuthManager, BaseCommand, Config, DeepgramClient
from deepctl_core.output import stderr_console
from deepgram_mcp import TransportType, run_proxy
from deepgram_mcp.proxy import DEFAULT_BASE_URL
from rich.console import Console

from .models import MCPServerResult

if TYPE_CHECKING:
pass

console = Console()
# Use stderr for all output: when `dg mcp --transport stdio` is invoked by an
# MCP host (Claude Desktop, Codex, etc.), stdout is the JSON-RPC protocol
# channel and must not receive any human-readable messages.
console = stderr_console


class McpCommand(BaseCommand):
Expand Down Expand Up @@ -151,12 +154,18 @@ def handle(
)
)

if transport == "stdio":
# The MCP host owns stdout (JSON-RPC channel) and typically
# closes it on disconnect, so we return None to skip
# output_result rather than risk a write to a closed stream.
return None

return MCPServerResult(
status="success",
message="MCP proxy stopped",
transport=TransportType(transport),
port=port if transport != "stdio" else None,
host=host if transport != "stdio" else None,
port=port,
host=host,
)

except KeyboardInterrupt:
Expand Down
15 changes: 9 additions & 6 deletions packages/deepctl-cmd-mcp/tests/unit/test_mcp_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ def test_handle_no_api_key(self):

@patch("deepctl_cmd_mcp.command.asyncio.run")
@patch("signal.signal")
def test_handle_stdio_transport(self, mock_signal, mock_asyncio_run):
"""Test handling stdio transport calls asyncio.run with run_proxy."""
def test_handle_stdio_transport_returns_none(self, mock_signal, mock_asyncio_run):
"""stdio mode returns None so the host's owned stdout is never written to.

Returning a result would invite ``output_result`` to print to stdout,
which the MCP host has typically closed. See DX-CLI-4/DX-CLI-5 in
Sentry for the regression we're guarding against.
"""
command = McpCommand()
config = MagicMock()
auth_manager = MagicMock()
Expand All @@ -93,9 +98,7 @@ def test_handle_stdio_transport(self, mock_signal, mock_asyncio_run):
)

mock_asyncio_run.assert_called_once()
assert isinstance(result, MCPServerResult)
assert result.status == "success"
assert result.transport == TransportType.STDIO
assert result is None

@patch("deepctl_cmd_mcp.command.asyncio.run")
@patch("signal.signal")
Expand Down Expand Up @@ -204,5 +207,5 @@ def test_handle_api_key_from_auth_manager(self):
transport="stdio",
)

assert result.status == "success"
assert result is None
mock_run.assert_called_once()
11 changes: 10 additions & 1 deletion packages/deepctl-core/src/deepctl_core/base_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,16 @@ def execute(self, ctx: click.Context, **kwargs: Any) -> None:
# Handle command result
if result is not None:
with TimingContext("output_processing"):
self.output_result(result, config)
try:
self.output_result(result, config)
except (BrokenPipeError, OSError):
# Downstream stream closed (e.g. an MCP host disconnected
# stdio after `dg mcp` finished). Nothing useful to log
# here because the logger writes to the same closed stream.
pass
except ValueError as exc:
if "closed file" not in str(exc):
raise

except KeyboardInterrupt:
self._tag_telemetry_status("cancelled")
Expand Down
93 changes: 93 additions & 0 deletions packages/deepctl-core/tests/unit/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,3 +1270,96 @@ def _param(name):
p = Mock()
p.name = name
return p


class TestExecuteStreamSafety:
"""The execute loop must not crash when stdout/stderr is closed.

Background: ``dg mcp --transport stdio`` runs under an MCP host (Claude
Desktop, Codex, etc.) which closes stdio when it disconnects. Any
write to the closed stream after that point raises ``ValueError:
I/O operation on closed file`` or ``BrokenPipeError``. We trap these
around the ``output_result`` call so the CLI exits cleanly instead of
surfacing a confusing crash to the user (or to Sentry).

See DX-CLI-4 / DX-CLI-5 in Sentry.
"""

@pytest.fixture
def mock_command_class(self):
class MockCommand(BaseCommand):
name = "test"
help = "Test command"

def handle(
self,
config: Config,
auth_manager: AuthManager,
client: DeepgramClient,
**kwargs: Any,
) -> Any:
return {"result": "ok"}

return MockCommand

@pytest.fixture
def mock_context(self):
ctx = Mock(spec=click.Context)
ctx.obj = {"config": Config()}
return ctx

@pytest.mark.unit
@patch("deepctl_core.base_command.AuthManager")
@patch("deepctl_core.base_command.DeepgramClient")
def test_execute_swallows_closed_stdout_value_error(
self,
_client_class,
_auth_class,
mock_command_class,
mock_context,
):
command = mock_command_class()
with patch.object(
command,
"output_result",
side_effect=ValueError("I/O operation on closed file."),
):
command.execute(mock_context)

@pytest.mark.unit
@patch("deepctl_core.base_command.AuthManager")
@patch("deepctl_core.base_command.DeepgramClient")
def test_execute_swallows_broken_pipe(
self,
_client_class,
_auth_class,
mock_command_class,
mock_context,
):
command = mock_command_class()
with patch.object(
command,
"output_result",
side_effect=BrokenPipeError(),
):
command.execute(mock_context)

@pytest.mark.unit
@patch("deepctl_core.base_command.AuthManager")
@patch("deepctl_core.base_command.DeepgramClient")
def test_execute_propagates_unrelated_value_error(
self,
_client_class,
_auth_class,
mock_command_class,
mock_context,
):
"""A ValueError that isn't about a closed stream is a real bug, surface it."""
command = mock_command_class()
with patch.object(
command,
"output_result",
side_effect=ValueError("totally unrelated"),
):
with pytest.raises(click.ClickException):
command.execute(mock_context)
33 changes: 33 additions & 0 deletions packages/deepctl-telemetry/src/deepctl_telemetry/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,46 @@ def _read_cli_version() -> str:
return importlib.metadata.version("deepctl")


_MCP_NOISE_LOGGERS = frozenset(
{
"mcp.client.streamable_http",
"mcp.server.lowlevel.server",
}
)


def _is_mcp_transient_noise(event: Event) -> bool:
"""Identify log events from the MCP SDK that are not actionable bugs.

`dg mcp` embeds the upstream `mcp` Python SDK, which logs to Sentry via
the logging integration whenever the upstream MCP server returns 5xx
(handled by the client) or the stdio peer closes mid-message (handled by
the server). Both are recovered internally. Surfacing them as Sentry
issues just creates triage cost for the DX team and hides real CLI bugs.

Any unhandled exception is kept regardless of logger.
"""
if (event.get("logger") or "") not in _MCP_NOISE_LOGGERS:
return False
for exc in (event.get("exception") or {}).get("values") or []:
mechanism = exc.get("mechanism") or {}
if mechanism.get("handled") is False:
return False
return True


def _scrub_event(event: Event, _hint: Hint) -> Event | None:
"""Drop request bodies, headers, and any user-identifying data.

Sentry SDK already filters most PII via send_default_pii=False, but
Auth tokens, project IDs, and file paths can still leak through
breadcrumbs and exception messages. This is a defense-in-depth scrub.

Also drops known-noise events (see ``_is_mcp_transient_noise``).
"""
if _is_mcp_transient_noise(event):
return None

request: dict[str, Any] = event.get("request") or {}
if "headers" in request:
request["headers"] = {}
Expand Down
118 changes: 118 additions & 0 deletions packages/deepctl-telemetry/tests/unit/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,121 @@ def test_flush_on_exit_swallows_exceptions(self) -> None:

with patch("sentry_sdk.flush", side_effect=RuntimeError("boom")):
_flush_on_exit()


def _handled_log_event(logger: str, exc_type: str, value: str) -> dict:
return {
"logger": logger,
"exception": {
"values": [
{
"type": exc_type,
"value": value,
"mechanism": {"type": "logging", "handled": True},
}
]
},
}


def _message_only_event(logger: str) -> dict:
return {"logger": logger}


class TestMcpNoiseFilter:
"""Drop handled MCP SDK log noise so Sentry stays actionable.

Anchors: DX-CLI-1 (HTTPStatusError 5xx from streamable_http) and
DX-CLI-3 (Invalid JSON: EOF from mcp.server.lowlevel.server).
Unhandled exceptions are always kept regardless of logger.
"""

def test_drops_streamable_http_503_log(self) -> None:
from deepctl_telemetry.client import _is_mcp_transient_noise

event = _handled_log_event(
"mcp.client.streamable_http",
"HTTPStatusError",
"Server error '503 Service Unavailable' for url",
)
assert _is_mcp_transient_noise(event)

def test_drops_lowlevel_server_eof_log(self) -> None:
from deepctl_telemetry.client import _is_mcp_transient_noise

event = _message_only_event("mcp.server.lowlevel.server")
assert _is_mcp_transient_noise(event)

def test_keeps_unhandled_exception_from_mcp_logger(self) -> None:
"""Unhandled crashes are real bugs even if the logger is MCP."""
from deepctl_telemetry.client import _is_mcp_transient_noise

event = {
"logger": "mcp.client.streamable_http",
"exception": {
"values": [
{
"type": "RuntimeError",
"value": "boom",
"mechanism": {"type": "excepthook", "handled": False},
}
]
},
}
assert not _is_mcp_transient_noise(event)

def test_keeps_non_mcp_logger_events(self) -> None:
from deepctl_telemetry.client import _is_mcp_transient_noise

event = _handled_log_event(
"deepctl_core.base_command",
"HTTPStatusError",
"Server error '503 Service Unavailable' for url",
)
assert not _is_mcp_transient_noise(event)

def test_handles_missing_fields(self) -> None:
from deepctl_telemetry.client import _is_mcp_transient_noise

assert not _is_mcp_transient_noise({})
assert not _is_mcp_transient_noise({"logger": None})
assert not _is_mcp_transient_noise({"logger": "deepctl"})
assert _is_mcp_transient_noise({"logger": "mcp.client.streamable_http"})

def test_scrub_event_returns_none_for_mcp_noise(self) -> None:
"""The before_send hook drops MCP noise entirely."""
from deepctl_telemetry.client import _scrub_event

event = _handled_log_event(
"mcp.client.streamable_http",
"HTTPStatusError",
"Server error '503 Service Unavailable' for url",
)
assert _scrub_event(event, {}) is None

def test_scrub_event_still_scrubs_normal_request_headers(self) -> None:
from deepctl_telemetry.client import _scrub_event

event = {
"logger": "deepctl",
"request": {
"headers": {"Authorization": "Bearer secret"},
"cookies": {"sid": "abc"},
"data": "{\"raw\":\"body\"}",
},
"user": {
"email": "x@example.com",
"ip_address": "1.2.3.4",
"username": "x",
"id": "user-1",
},
}
result = _scrub_event(event, {})
assert result is not None
assert result["request"]["headers"] == {}
assert result["request"]["cookies"] == {}
assert result["request"]["data"] == "[Filtered]"
assert "email" not in result["user"]
assert "ip_address" not in result["user"]
assert "username" not in result["user"]
assert result["user"]["id"] == "user-1"
Loading