Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 184 additions & 43 deletions src/ccbot/bot/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import asyncio
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -42,12 +44,24 @@
safe_reply,
send_with_fallback,
)
from ..handlers.notifications import lookup_session_for_message
from ..handlers.notifications import (
begin_repost_intent,
end_repost_intent,
enter_kb_mode,
lookup_session_for_message,
repost_card,
resume_card_view,
)
from ..session_models import Session
from ..handlers.inbox import save_inbox_file
from ..markdown_v2 import convert_markdown
from ..naming import maybe_auto_name
from ..session import session_manager
from ..terminal_parser import extract_bash_output, is_interactive_ui
from ..terminal_parser import (
extract_bash_output,
extract_interactive_content,
is_interactive_ui,
)
from ..tmux_manager import tmux_manager
from ..transcribe import transcribe_voice
from ..utils import ccbot_dir
Expand All @@ -56,6 +70,114 @@
logger = logging.getLogger(__name__)


class _RepostHandle:
"""Mutable flag used with :func:`_card_repost_bracket`. Call
:meth:`commit` after the pane send succeeded; the bracket then
reposts the live card on exit.
"""

__slots__ = ("do_repost",)

def __init__(self) -> None:
self.do_repost = False

def commit(self) -> None:
self.do_repost = True


@asynccontextmanager
async def _card_repost_bracket(
bot: Bot, user_id: int, sess: Session | None
) -> AsyncGenerator[_RepostHandle, None]:
"""Bracket a send-to-pane operation with the live-card repost machinery.

Entry: drop any Menu/sub-screen pause + arm ``repost_intent`` so a
concurrent ``update_session_card`` buffers events instead of spawning
a second card above the user's message.
Exit (only when caller invoked ``handle.commit()``): repost the card
below the user's message and drain buffered events into it.
Always: clear ``repost_intent`` so the live card unblocks for the
next turn.

No-op when ``sess`` is None (orphan window / no Session record).
"""
handle = _RepostHandle()
if sess is None:
yield handle
return
await resume_card_view(bot, user_id, sess)
begin_repost_intent(user_id, sess.id)
try:
yield handle
finally:
if handle.do_repost:
try:
await repost_card(bot, user_id, sess)
except Exception as e:
logger.debug("repost_card failed: %s", e)
end_repost_intent(user_id, sess.id)


async def _intercept_if_pending_ui(
bot: Bot,
user_id: int,
wid: str,
reply_to: Any,
) -> bool:
"""If the pane has a pending interactive UI, surface it and intercept.

Returns True iff the caller MUST NOT call ``send_to_window``: the
AskUserQuestion / ExitPlanMode / Permission prompt on the pane would
otherwise consume the user's text as menu keystrokes (digits select
options, Enter submits). Caller should ``return`` on True.

Surface preference:
- Active session (sess matches ``get_active_session``) → kb-mode
card via ``enter_kb_mode``. Idempotent: a no-op if the card is
already in kb-mode for the same prompt.
- Orphan window or bg session → legacy floating msg via
``handle_interactive_ui``.
"""
w = await tmux_manager.find_window_by_id(wid)
if not w:
return False
pane_text = await tmux_manager.capture_pane(w.window_id)
if not pane_text or not is_interactive_ui(pane_text):
return False
sess = session_manager.find_session_by_window(wid)
active = session_manager.get_active_session(user_id)
is_active = sess is not None and active is not None and active.id == sess.id
surfaced = False
if is_active and sess is not None:
content_obj = extract_interactive_content(pane_text)
if content_obj is not None:
await enter_kb_mode(
bot, user_id, sess, content_obj.content, content_obj.name
)
surfaced = True
if not surfaced:
await handle_interactive_ui(bot, user_id, wid)
logger.info(
"intercepted_user_msg_pending_ui user=%d wid=%s",
user_id,
wid,
extra={
"event": "intercepted_user_msg_pending_ui",
"user_id": user_id,
"window_id": wid,
},
)
try:
await safe_reply(
reply_to,
"⏳ Pending prompt above — answer it via the keyboard first. "
"Your message wasn't sent.",
)
except Exception:
pass
return True


# --- forward_command — any /command that has no dedicated handler goes here ---


Expand Down Expand Up @@ -100,19 +222,24 @@ async def forward_command_handler(
"window_id": wid,
},
)
success, message = await session_manager.send_to_window(wid, cc_slash)
if success:
# /clear: drop the session association so we re-detect once a new
# session id is written by the next user message.
if cc_slash.strip().lower() == "/clear":
logger.info("Clearing session for window %s after /clear", display)
session_manager.clear_window_session(wid)
await safe_reply(
update.message,
"🧹 Context cleared. Next message starts a fresh Claude session.",
)
else:
await safe_reply(update.message, f"❌ {message}")
if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message):
return
sess = session_manager.find_session_by_window(wid)
async with _card_repost_bracket(context.bot, user.id, sess) as repost:
success, message = await session_manager.send_to_window(wid, cc_slash)
if success:
repost.commit()
# /clear: drop the session association so we re-detect once a
# new session id is written by the next user message.
if cc_slash.strip().lower() == "/clear":
logger.info("Clearing session for window %s after /clear", display)
session_manager.clear_window_session(wid)
await safe_reply(
update.message,
"🧹 Context cleared. Next message starts a fresh Claude session.",
)
else:
await safe_reply(update.message, f"❌ {message}")


# --- non-text catch-all ---
Expand Down Expand Up @@ -243,13 +370,17 @@ async def unsupported_content_handler(
"window_id": wid,
},
)
success, message = await session_manager.send_to_window(wid, text_to_send)
if not success:
await safe_reply(msg, f"❌ {message}")
if await _intercept_if_pending_ui(context.bot, user.id, wid, msg):
return
sess = session_manager.find_session_by_window(wid)
if sess is not None:
session_manager.touch_session(sess.id)
async with _card_repost_bracket(context.bot, user.id, sess) as repost:
success, message = await session_manager.send_to_window(wid, text_to_send)
if not success:
await safe_reply(msg, f"❌ {message}")
return
if sess is not None:
session_manager.touch_session(sess.id)
repost.commit()
# No success reply — the user just sent the message; they know
# they sent it. Errors above still surface.
return
Expand Down Expand Up @@ -356,12 +487,16 @@ async def _fetch(target: Path) -> None:
file_path = await save_inbox_file(workdir, filename, _fetch)

caption = update.message.caption or ""
success, message = await _forward_inbox_file(
user.id, wid, user.id, file_path, caption, "image", context.bot
)
if not success:
await safe_reply(update.message, f"❌ {message}")
if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message):
return
async with _card_repost_bracket(context.bot, user.id, sess) as repost:
success, message = await _forward_inbox_file(
user.id, wid, user.id, file_path, caption, "image", context.bot
)
if not success:
await safe_reply(update.message, f"❌ {message}")
return
repost.commit()


async def document_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
Expand Down Expand Up @@ -407,12 +542,16 @@ async def _fetch(target: Path) -> None:
file_path = await save_inbox_file(workdir, filename, _fetch)

caption = update.message.caption or ""
success, message = await _forward_inbox_file(
user.id, wid, user.id, file_path, caption, "document", context.bot
)
if not success:
await safe_reply(update.message, f"❌ {message}")
if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message):
return
async with _card_repost_bracket(context.bot, user.id, sess) as repost:
success, message = await _forward_inbox_file(
user.id, wid, user.id, file_path, caption, "document", context.bot
)
if not success:
await safe_reply(update.message, f"❌ {message}")
return
repost.commit()


# --- voice ---
Expand Down Expand Up @@ -486,11 +625,17 @@ async def voice_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N
},
)

success, message = await session_manager.send_to_window(wid, text)
if not success:
await safe_reply(update.message, f"❌ {message}")
if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message):
return

sess = session_manager.find_session_by_window(wid)
async with _card_repost_bracket(context.bot, user.id, sess) as repost:
success, message = await session_manager.send_to_window(wid, text)
if not success:
await safe_reply(update.message, f"❌ {message}")
return
repost.commit()


# --- text + bash !cmd capture ---

Expand Down Expand Up @@ -685,16 +830,12 @@ async def text_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
# New message pushes pane content down — kill any in-flight bash capture.
cancel_bash_capture(user.id, wid)

# Catch interactive UIs that polling might have missed before sending.
pane_text = await tmux_manager.capture_pane(w.window_id)
if pane_text and is_interactive_ui(pane_text):
logger.info(
"Detected pending interactive UI before sending text (user=%d, window=%s)",
user.id,
wid,
)
await handle_interactive_ui(context.bot, user.id, wid)
await asyncio.sleep(0.3)
# Pending AskUserQuestion / ExitPlanMode / Permission on the pane
# would consume our keystrokes as menu navigation (digits select,
# Enter submits). Surface the prompt to the user and bail before
# send_to_window — the user must answer via the keyboard.
if await _intercept_if_pending_ui(context.bot, user.id, wid, update.message):
return

import time as _time

Expand Down
6 changes: 6 additions & 0 deletions tests/ccbot/test_forward_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ async def test_model_sends_command_to_tmux(self):
mock_sm.get_active_window.return_value = "@5"
mock_sm.get_display_name.return_value = "project"
mock_tmux.find_window_by_id = AsyncMock(return_value=MagicMock())
mock_tmux.capture_pane = AsyncMock(return_value="")
mock_sm.find_session_by_window.return_value = None
mock_sm.send_to_window = AsyncMock(return_value=(True, "ok"))

from ccbot.bot import forward_command_handler
Expand All @@ -72,6 +74,8 @@ async def test_cost_sends_command_to_tmux(self):
mock_sm.get_active_window.return_value = "@5"
mock_sm.get_display_name.return_value = "project"
mock_tmux.find_window_by_id = AsyncMock(return_value=MagicMock())
mock_tmux.capture_pane = AsyncMock(return_value="")
mock_sm.find_session_by_window.return_value = None
mock_sm.send_to_window = AsyncMock(return_value=(True, "ok"))

from ccbot.bot import forward_command_handler
Expand All @@ -96,6 +100,8 @@ async def test_clear_clears_session(self):
mock_sm.get_active_window.return_value = "@5"
mock_sm.get_display_name.return_value = "project"
mock_tmux.find_window_by_id = AsyncMock(return_value=MagicMock())
mock_tmux.capture_pane = AsyncMock(return_value="")
mock_sm.find_session_by_window.return_value = None
mock_sm.send_to_window = AsyncMock(return_value=(True, "ok"))

from ccbot.bot import forward_command_handler
Expand Down
Loading