diff --git a/CHANGELOG.md b/CHANGELOG.md index 9630a47..2969fae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased] +## [0.4.90] - 2026-03-16 + +### Added +- **Sidebar unread badges** - Left-rail navigation items for Messages, Channels, and Social Feed now show aggregate unread counts as compact pill badges that update via periodic polling and on window focus. Zero-state badges are hidden; counts cap visually at `99+`. +- **Durable feed-view acknowledgement** - Opening the Social Feed records a per-user acknowledgement timestamp so the feed unread badge reflects genuinely new activity since the last visit. Own-authored posts are excluded from the unread count. +- **Notification deep-link to exact messages** - Bell notification clicks for channel messages now navigate to the exact target message via a server-side focused context window, even when the message is older than the recent page. DM bell clicks include a `#message-` anchor for exact-message scrolling. +- **Container-aware focus scrolling** - Channel message focus now uses measured offsets within `#messages-container` instead of `scrollIntoView()`, and retries shortly after render to absorb layout shifts from async hydration. + +### Fixed +- **Bell duplicate counting for mention-bearing messages** - The notification bell now deduplicates by semantic activity key so a `channel_message` event and a `mention` event for the same source message increment the unread badge only once, with the higher-priority event winning the display slot. + ## [0.4.89] - 2026-03-15 ### Added diff --git a/canopy/__init__.py b/canopy/__init__.py index 9c819ad..aad90e5 100644 --- a/canopy/__init__.py +++ b/canopy/__init__.py @@ -11,7 +11,7 @@ Development: AI-assisted implementation (Claude, Codex, GitHub Copilot, Cursor IDE, Ollama) """ -__version__ = "0.4.89" +__version__ = "0.4.90" __protocol_version__ = 1 __author__ = "Canopy Contributors" __license__ = "Apache-2.0" diff --git a/canopy/core/channels.py b/canopy/core/channels.py index b9a3f30..12ee9c8 100644 --- a/canopy/core/channels.py +++ b/canopy/core/channels.py @@ -4424,11 +4424,112 @@ def purge_expired_channel_messages(self) -> List[Dict[str, Any]]: return purged - def get_channel_messages(self, channel_id: str, user_id: str, + def _row_to_channel_message(self, row: Any, row_type: str = "message") -> Optional[Message]: + """Best-effort row parser used across channel message query paths.""" + try: + try: + msg_type = MessageType(row['message_type']) + except (ValueError, KeyError): + msg_type = MessageType.TEXT + + created_at_raw = row['created_at'] or '' + try: + created_at = datetime.fromisoformat(created_at_raw.replace('Z', '+00:00')) + except (ValueError, AttributeError): + try: + created_at = datetime.strptime(created_at_raw, '%Y-%m-%d %H:%M:%S') + except Exception: + created_at = datetime.now() + + edited_at = None + if row['edited_at']: + try: + edited_at = datetime.fromisoformat(row['edited_at'].replace('Z', '+00:00')) + except (ValueError, AttributeError): + pass + + expires_at = self._parse_datetime(row['expires_at']) if 'expires_at' in row.keys() else None + content_text = row['content'] or '' + try: + crypto_state = (row['crypto_state'] or '').strip().lower() + except Exception: + crypto_state = '' + if not content_text and crypto_state == 'pending_decrypt': + content_text = '[Encrypted message pending key]' + elif not content_text and crypto_state == 'decrypt_failed': + content_text = '[Encrypted message could not be decrypted]' + + return Message( + id=row['id'], + channel_id=row['channel_id'], + user_id=row['user_id'], + content=content_text, + message_type=msg_type, + created_at=created_at, + thread_id=row['thread_id'], + parent_message_id=row['parent_message_id'], + reactions=json.loads(row['reactions']) if row['reactions'] else None, + attachments=json.loads(row['attachments']) if row['attachments'] else None, + security=json.loads(row['security']) if row['security'] else None, + edited_at=edited_at, + expires_at=expires_at, + origin_peer=row['origin_peer'] if 'origin_peer' in row.keys() else None, + crypto_state=row['crypto_state'] if 'crypto_state' in row.keys() else None, + ) + except Exception as row_err: + row_id = '?' + try: + row_id = row['id'] if 'id' in row.keys() else '?' + except Exception: + row_id = '?' + logger.warning(f"Skipping corrupt {row_type} row {row_id}: {row_err}") + return None + + def _hydrate_missing_parent_messages( + self, + conn: Any, + channel_id: str, + messages: List[Message], + ) -> List[Message]: + """Append missing parent/ancestor messages so replies render with context.""" + msg_ids = {m.id for m in messages} + missing_parent_ids = { + m.parent_message_id + for m in messages + if m.parent_message_id and m.parent_message_id not in msg_ids + } + while missing_parent_ids: + placeholders = ",".join("?" * len(missing_parent_ids)) + parent_rows = conn.execute( + f""" + SELECT m.*, u.username as author_username + FROM channel_messages m + LEFT JOIN users u ON m.user_id = u.id + WHERE m.channel_id = ? AND m.id IN ({placeholders}) + AND (m.expires_at IS NULL OR m.expires_at > CURRENT_TIMESTAMP) + """, + [channel_id] + list(missing_parent_ids), + ).fetchall() + if not parent_rows: + break + + next_missing_parent_ids = set() + for row in parent_rows: + message = self._row_to_channel_message(row, "parent") + if not message or message.id in msg_ids: + continue + messages.append(message) + msg_ids.add(message.id) + if message.parent_message_id and message.parent_message_id not in msg_ids: + next_missing_parent_ids.add(message.parent_message_id) + missing_parent_ids = next_missing_parent_ids + return messages + + def get_channel_messages(self, channel_id: str, user_id: str, limit: int = 50, before_message_id: Optional[str] = None) -> List[Message]: """Get messages from a channel.""" logger.debug(f"Getting messages for channel {channel_id}, user {user_id}, limit {limit}") - + try: access = self.get_channel_access_decision( channel_id=channel_id, @@ -4443,9 +4544,6 @@ def get_channel_messages(self, channel_id: str, user_id: str, return [] with self.db.get_connection() as conn: - # Build query — sort root messages by activity time - # (resurfaced parents appear at the bottom/newest position). - # Replies sort with their parent via COALESCE on the parent row. sort_expr = """ CASE WHEN m.parent_message_id IS NOT NULL THEN @@ -4477,11 +4575,8 @@ def get_channel_messages(self, channel_id: str, user_id: str, AND (m.expires_at IS NULL OR m.expires_at > CURRENT_TIMESTAMP) """ params: List[Any] = [channel_id] - + if before_message_id: - # Pagination must use the same sort tuple as ORDER BY to - # avoid gaps/duplicates when old threads are resurfaced by - # new replies. query += f""" AND ( {sort_expr} < ( @@ -4508,126 +4603,118 @@ def get_channel_messages(self, channel_id: str, user_id: str, before_message_id, before_message_id, ]) - + query += " ORDER BY sort_time DESC, m.created_at DESC LIMIT ?" params.append(limit) - - cursor = conn.execute(query, params) - rows = cursor.fetchall() - - def _row_to_message(row: Any, row_type: str = "message") -> Optional[Message]: - """Best-effort row parser used for primary query rows and recursively fetched parents.""" - try: - try: - msg_type = MessageType(row['message_type']) - except (ValueError, KeyError): - msg_type = MessageType.TEXT - - created_at_raw = row['created_at'] or '' - try: - created_at = datetime.fromisoformat(created_at_raw.replace('Z', '+00:00')) - except (ValueError, AttributeError): - try: - created_at = datetime.strptime(created_at_raw, '%Y-%m-%d %H:%M:%S') - except Exception: - created_at = datetime.now() - - edited_at = None - if row['edited_at']: - try: - edited_at = datetime.fromisoformat(row['edited_at'].replace('Z', '+00:00')) - except (ValueError, AttributeError): - pass - - expires_at = self._parse_datetime(row['expires_at']) if 'expires_at' in row.keys() else None - content_text = row['content'] or '' - try: - crypto_state = (row['crypto_state'] or '').strip().lower() - except Exception: - crypto_state = '' - if not content_text and crypto_state == 'pending_decrypt': - content_text = '[Encrypted message pending key]' - elif not content_text and crypto_state == 'decrypt_failed': - content_text = '[Encrypted message could not be decrypted]' - - return Message( - id=row['id'], - channel_id=row['channel_id'], - user_id=row['user_id'], - content=content_text, - message_type=msg_type, - created_at=created_at, - thread_id=row['thread_id'], - parent_message_id=row['parent_message_id'], - reactions=json.loads(row['reactions']) if row['reactions'] else None, - attachments=json.loads(row['attachments']) if row['attachments'] else None, - security=json.loads(row['security']) if row['security'] else None, - edited_at=edited_at, - expires_at=expires_at, - origin_peer=row['origin_peer'] if 'origin_peer' in row.keys() else None, - crypto_state=row['crypto_state'] if 'crypto_state' in row.keys() else None, - ) - except Exception as row_err: - row_id = '?' - try: - row_id = row['id'] if 'id' in row.keys() else '?' - except Exception: - row_id = '?' - logger.warning(f"Skipping corrupt {row_type} row {row_id}: {row_err}") - return None - # Convert to Message objects (skip corrupt rows gracefully) + rows = conn.execute(query, params).fetchall() messages: List[Message] = [] for row in rows: - message = _row_to_message(row, "message") + message = self._row_to_channel_message(row, "message") if message: messages.append(message) - - # Reverse to get chronological order - messages.reverse() - - # Include any missing parent/ancestor messages so replies render under the correct post. - # This walks parent chains recursively; one-level hydration can orphan deep reply chains. - msg_ids = {m.id for m in messages} - missing_parent_ids = { - m.parent_message_id - for m in messages - if m.parent_message_id and m.parent_message_id not in msg_ids - } - while missing_parent_ids: - placeholders = ",".join("?" * len(missing_parent_ids)) - parent_rows = conn.execute( - f""" - SELECT m.*, u.username as author_username - FROM channel_messages m - LEFT JOIN users u ON m.user_id = u.id - WHERE m.channel_id = ? AND m.id IN ({placeholders}) - AND (m.expires_at IS NULL OR m.expires_at > CURRENT_TIMESTAMP) - """, - [channel_id] + list(missing_parent_ids), - ).fetchall() - if not parent_rows: - break - next_missing_parent_ids = set() - for row in parent_rows: - message = _row_to_message(row, "parent") - if not message or message.id in msg_ids: - continue - messages.append(message) - msg_ids.add(message.id) - if message.parent_message_id and message.parent_message_id not in msg_ids: - next_missing_parent_ids.add(message.parent_message_id) - # Keep original page ordering stable for pagination cursors and only append ancestors. - missing_parent_ids = next_missing_parent_ids + messages.reverse() + messages = self._hydrate_missing_parent_messages(conn, channel_id, messages) logger.debug(f"Retrieved {len(messages)} messages from channel {channel_id}") return messages - + except Exception as e: logger.error(f"Failed to get channel messages: {e}", exc_info=True) return [] + def get_channel_message_context( + self, + channel_id: str, + message_id: str, + user_id: str, + radius: int = 12, + ) -> List[Message]: + """Return a focused message window around a specific message id.""" + if not channel_id or not message_id or not user_id: + return [] + try: + access = self.get_channel_access_decision( + channel_id=channel_id, + user_id=user_id, + require_membership=True, + ) + if not access.get('allowed'): + return [] + + radius = max(1, min(int(radius or 12), 50)) + with self.db.get_connection() as conn: + target_row = conn.execute( + """ + SELECT m.*, u.username as author_username + FROM channel_messages m + LEFT JOIN users u ON m.user_id = u.id + WHERE m.channel_id = ? AND m.id = ? + AND (m.expires_at IS NULL OR m.expires_at > CURRENT_TIMESTAMP) + LIMIT 1 + """, + (channel_id, message_id), + ).fetchone() + if not target_row: + return [] + + pivot_created_at = target_row['created_at'] + before_rows = conn.execute( + """ + SELECT m.*, u.username as author_username + FROM channel_messages m + LEFT JOIN users u ON m.user_id = u.id + WHERE m.channel_id = ? + AND (m.expires_at IS NULL OR m.expires_at > CURRENT_TIMESTAMP) + AND m.created_at < ? + ORDER BY m.created_at DESC + LIMIT ? + """, + (channel_id, pivot_created_at, radius), + ).fetchall() + after_rows = conn.execute( + """ + SELECT m.*, u.username as author_username + FROM channel_messages m + LEFT JOIN users u ON m.user_id = u.id + WHERE m.channel_id = ? + AND (m.expires_at IS NULL OR m.expires_at > CURRENT_TIMESTAMP) + AND m.created_at > ? + ORDER BY m.created_at ASC + LIMIT ? + """, + (channel_id, pivot_created_at, radius), + ).fetchall() + + rows: List[Any] = list(reversed(before_rows)) + [target_row] + list(after_rows) + messages: List[Message] = [] + seen_ids: set[str] = set() + for row in rows: + message = self._row_to_channel_message(row, "context") + if not message or message.id in seen_ids: + continue + seen_ids.add(message.id) + messages.append(message) + + messages = self._hydrate_missing_parent_messages(conn, channel_id, messages) + messages.sort( + key=lambda m: ( + (m.created_at.isoformat() if hasattr(m.created_at, 'isoformat') else str(m.created_at)), + m.id, + ) + ) + return messages + except Exception as e: + logger.error( + "Failed to get channel message context channel=%s message=%s: %s", + channel_id, + message_id, + e, + exc_info=True, + ) + return [] + def get_channel_message( self, channel_id: str, message_id: str, user_id: str ) -> Optional[Message]: @@ -4653,42 +4740,7 @@ def get_channel_message( ).fetchone() if not row: return None - try: - msg_type = MessageType(row['message_type']) - except (ValueError, KeyError): - msg_type = MessageType.TEXT - created_at_raw = row['created_at'] or '' - try: - created_at = datetime.fromisoformat(created_at_raw.replace('Z', '+00:00')) - except (ValueError, AttributeError): - try: - created_at = datetime.strptime(created_at_raw, '%Y-%m-%d %H:%M:%S') - except Exception: - created_at = datetime.now() - edited_at = None - if row.get('edited_at'): - try: - edited_at = datetime.fromisoformat(str(row['edited_at']).replace('Z', '+00:00')) - except (ValueError, AttributeError): - pass - expires_at = self._parse_datetime(row['expires_at']) if row.get('expires_at') else None - return Message( - id=row['id'], - channel_id=row['channel_id'], - user_id=row['user_id'], - content=row['content'] or '', - message_type=msg_type, - created_at=created_at, - thread_id=row.get('thread_id'), - parent_message_id=row.get('parent_message_id'), - reactions=json.loads(row['reactions']) if row.get('reactions') else None, - attachments=json.loads(row['attachments']) if row.get('attachments') else None, - security=json.loads(row['security']) if row.get('security') else None, - edited_at=edited_at, - expires_at=expires_at, - origin_peer=row.get('origin_peer'), - crypto_state=row.get('crypto_state'), - ) + return self._row_to_channel_message(row, "single") except Exception as e: logger.error(f"Failed to get channel message {message_id}: {e}", exc_info=True) return None diff --git a/canopy/core/database.py b/canopy/core/database.py index 87dd3a9..33ea0e2 100644 --- a/canopy/core/database.py +++ b/canopy/core/database.py @@ -244,6 +244,7 @@ def _initialize_database(self) -> None: CREATE TABLE IF NOT EXISTS user_feed_preferences ( user_id TEXT PRIMARY KEY, algorithm_json TEXT NOT NULL DEFAULT '{}', + last_viewed_at TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ); @@ -263,7 +264,10 @@ def _initialize_database(self) -> None: status TEXT DEFAULT 'pending', priority TEXT DEFAULT 'normal', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + seen_at TIMESTAMP, handled_at TIMESTAMP, + completed_at TIMESTAMP, + completion_ref_json TEXT, expires_at TIMESTAMP, triggered_by_inbox_id TEXT, depth INTEGER DEFAULT 0, @@ -324,6 +328,35 @@ def _initialize_database(self) -> None: CREATE INDEX IF NOT EXISTS idx_agent_presence_checkin ON agent_presence(last_checkin_at); + CREATE TABLE IF NOT EXISTS agent_runtime_state ( + user_id TEXT PRIMARY KEY, + last_event_fetch_at TIMESTAMP, + last_event_cursor_seen INTEGER, + last_inbox_fetch_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_agent_runtime_event_fetch + ON agent_runtime_state(last_event_fetch_at); + CREATE INDEX IF NOT EXISTS idx_agent_runtime_inbox_fetch + ON agent_runtime_state(last_inbox_fetch_at); + + CREATE TABLE IF NOT EXISTS agent_event_subscription_state ( + user_id TEXT PRIMARY KEY, + custom_enabled INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_agent_event_subscription_state_enabled + ON agent_event_subscription_state(custom_enabled, updated_at); + + CREATE TABLE IF NOT EXISTS agent_event_subscriptions ( + user_id TEXT NOT NULL, + event_type TEXT NOT NULL, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (user_id, event_type) + ); + CREATE INDEX IF NOT EXISTS idx_agent_event_subscriptions_user + ON agent_event_subscriptions(user_id, updated_at); + -- Local workspace event journal (additive read/delivery model) CREATE TABLE IF NOT EXISTS workspace_events ( seq INTEGER PRIMARY KEY AUTOINCREMENT, @@ -631,6 +664,13 @@ def _run_migrations(self, conn: sqlite3.Connection) -> None: if 'status' in feed_columns: conn.execute("CREATE INDEX IF NOT EXISTS idx_feed_posts_status ON feed_posts(status)") conn.execute("CREATE INDEX IF NOT EXISTS idx_post_permissions_user ON post_permissions(user_id)") + + cursor = conn.execute("PRAGMA table_info(user_feed_preferences)") + user_feed_pref_columns = [row[1] for row in cursor.fetchall()] + if 'last_viewed_at' not in user_feed_pref_columns: + logger.info("Migration: Adding last_viewed_at column to user_feed_preferences") + conn.execute("ALTER TABLE user_feed_preferences ADD COLUMN last_viewed_at TIMESTAMP") + # channel_messages is created by ChannelManager — only add index if table exists cm_exists = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='channel_messages'" @@ -704,6 +744,79 @@ def _run_migrations(self, conn: sqlite3.Connection) -> None: ON agent_presence(last_checkin_at); """) + ars_exists = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='agent_runtime_state'" + ).fetchone() + if not ars_exists: + logger.info("Migration: Creating agent_runtime_state table") + conn.executescript(""" + CREATE TABLE IF NOT EXISTS agent_runtime_state ( + user_id TEXT PRIMARY KEY, + last_event_fetch_at TIMESTAMP, + last_event_cursor_seen INTEGER, + last_inbox_fetch_at TIMESTAMP, + updated_at TIMESTAMP NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_agent_runtime_event_fetch + ON agent_runtime_state(last_event_fetch_at); + CREATE INDEX IF NOT EXISTS idx_agent_runtime_inbox_fetch + ON agent_runtime_state(last_inbox_fetch_at); + """) + + aess_exists = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='agent_event_subscription_state'" + ).fetchone() + if not aess_exists: + logger.info("Migration: Creating agent_event_subscription_state table") + conn.executescript(""" + CREATE TABLE IF NOT EXISTS agent_event_subscription_state ( + user_id TEXT PRIMARY KEY, + custom_enabled INTEGER NOT NULL DEFAULT 0, + updated_at TIMESTAMP NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_agent_event_subscription_state_enabled + ON agent_event_subscription_state(custom_enabled, updated_at); + """) + + aes_exists = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='agent_event_subscriptions'" + ).fetchone() + if not aes_exists: + logger.info("Migration: Creating agent_event_subscriptions table") + conn.executescript(""" + CREATE TABLE IF NOT EXISTS agent_event_subscriptions ( + user_id TEXT NOT NULL, + event_type TEXT NOT NULL, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (user_id, event_type) + ); + CREATE INDEX IF NOT EXISTS idx_agent_event_subscriptions_user + ON agent_event_subscriptions(user_id, updated_at); + """) + + cursor = conn.execute("PRAGMA table_info(agent_inbox)") + inbox_columns = {row[1] for row in cursor.fetchall()} + if "seen_at" not in inbox_columns: + logger.info("Migration: Adding seen_at column to agent_inbox") + conn.execute("ALTER TABLE agent_inbox ADD COLUMN seen_at TIMESTAMP") + if "completed_at" not in inbox_columns: + logger.info("Migration: Adding completed_at column to agent_inbox") + conn.execute("ALTER TABLE agent_inbox ADD COLUMN completed_at TIMESTAMP") + if "completion_ref_json" not in inbox_columns: + logger.info("Migration: Adding completion_ref_json column to agent_inbox") + conn.execute("ALTER TABLE agent_inbox ADD COLUMN completion_ref_json TEXT") + + logger.info("Migration: Normalizing legacy handled inbox rows") + conn.execute( + """ + UPDATE agent_inbox + SET status = 'completed', + seen_at = COALESCE(seen_at, handled_at, created_at), + completed_at = COALESCE(completed_at, handled_at) + WHERE status = 'handled' + """ + ) + # Migration: content_contexts table for best-effort extracted text context conn.executescript(""" CREATE TABLE IF NOT EXISTS content_contexts ( @@ -1331,6 +1444,9 @@ def _exec_optional(sql: str, params: tuple[Any, ...]) -> None: _exec_optional("DELETE FROM channel_member_sync_deliveries WHERE target_user_id = ?", (user_id,)) _exec_optional("DELETE FROM likes WHERE user_id = ?", (user_id,)) _exec_optional("DELETE FROM agent_presence WHERE user_id = ?", (user_id,)) + _exec_optional("DELETE FROM agent_runtime_state WHERE user_id = ?", (user_id,)) + _exec_optional("DELETE FROM agent_event_subscription_state WHERE user_id = ?", (user_id,)) + _exec_optional("DELETE FROM agent_event_subscriptions WHERE user_id = ?", (user_id,)) # Channel messages (table in channels.py, same DB): likes then parent refs then messages try: diff --git a/canopy/core/feed.py b/canopy/core/feed.py index ca5b474..f1dbbd2 100644 --- a/canopy/core/feed.py +++ b/canopy/core/feed.py @@ -1001,6 +1001,79 @@ def save_feed_algorithm(self, user_id: str, algo: FeedAlgorithm) -> bool: logger.error(f"Failed to save feed algorithm for {user_id}: {e}") return False + def get_feed_last_viewed_at(self, user_id: str) -> Optional[datetime]: + """Return the last time the user acknowledged the feed view.""" + try: + with self.db.get_connection() as conn: + row = conn.execute( + "SELECT last_viewed_at FROM user_feed_preferences WHERE user_id = ?", + (user_id,), + ).fetchone() + if not row: + return None + return self._parse_datetime(row['last_viewed_at']) + except Exception as e: + logger.warning(f"Failed to load feed last_viewed_at for {user_id}: {e}") + return None + + def mark_feed_viewed(self, user_id: str, viewed_at: Optional[datetime] = None) -> bool: + """Record that the user has intentionally viewed the feed.""" + if not user_id: + return False + viewed_dt = viewed_at or datetime.now(timezone.utc) + viewed_db = self._format_db_timestamp(viewed_dt) + try: + with self.db.get_connection() as conn: + conn.execute(""" + INSERT INTO user_feed_preferences (user_id, algorithm_json, last_viewed_at, updated_at) + VALUES (?, '{}', ?, CURRENT_TIMESTAMP) + ON CONFLICT(user_id) DO UPDATE SET + last_viewed_at = excluded.last_viewed_at, + updated_at = CURRENT_TIMESTAMP + """, (user_id, viewed_db)) + conn.commit() + return True + except Exception as e: + logger.error(f"Failed to mark feed viewed for {user_id}: {e}") + return False + + def count_unread_posts(self, user_id: str, *, exclude_own_posts: bool = True) -> int: + """Count feed posts with new activity since the user's last acknowledged feed view.""" + if not user_id: + return 0 + + last_viewed_at = self.get_feed_last_viewed_at(user_id) + params: List[Any] = [user_id, user_id] + own_clause = "" + if exclude_own_posts: + own_clause = " AND p.author_id != ?" + params.append(user_id) + since_clause = "" + if last_viewed_at: + since_clause = " AND COALESCE(p.last_activity_at, p.created_at) > ?" + params.append(self._format_db_timestamp(last_viewed_at)) + + try: + with self.db.get_connection() as conn: + row = conn.execute(f""" + SELECT COUNT(DISTINCT p.id) AS unread_count + FROM feed_posts p + LEFT JOIN post_permissions pp ON p.id = pp.post_id + WHERE ( + p.visibility = 'public' OR + p.visibility = 'network' OR + (p.visibility = 'custom' AND pp.user_id = ?) OR + p.author_id = ? + ) + AND (p.expires_at IS NULL OR p.expires_at > CURRENT_TIMESTAMP) + {own_clause} + {since_clause} + """, params).fetchone() + return max(0, int((row['unread_count'] if row else 0) or 0)) + except Exception as e: + logger.error(f"Failed to count unread feed posts for {user_id}: {e}") + return 0 + def get_available_tags(self, limit: int = 50) -> List[Dict[str, Any]]: """Get popular tags across all posts for the tag picker UI.""" try: diff --git a/canopy/ui/routes.py b/canopy/ui/routes.py index de29404..2a8303d 100644 --- a/canopy/ui/routes.py +++ b/canopy/ui/routes.py @@ -433,10 +433,11 @@ def inject_sidebar_peers(): if not _is_authenticated(): return {} try: - db_manager, _, trust_manager, message_manager, channel_manager, _, _, _, profile_manager, _, p2p_manager = _get_app_components_any(current_app) + db_manager, _, trust_manager, message_manager, channel_manager, _, feed_manager, _, profile_manager, _, p2p_manager = _get_app_components_any(current_app) workspace_event_manager = current_app.config.get('WORKSPACE_EVENT_MANAGER') connected_peers = p2p_manager.get_connected_peers() if p2p_manager else [] local_peer_id = p2p_manager.get_peer_id() if p2p_manager else None + current_user_id = session.get('user_id') peer_snapshot = _build_sidebar_peer_snapshot( list(connected_peers or []), trust_manager, @@ -462,6 +463,15 @@ def inject_sidebar_peers(): 'sidebar_dm_event_cursor': int((workspace_event_manager.get_latest_seq() if workspace_event_manager else 0) or 0), 'sidebar_local_peer_id': local_peer_id, } + attention_snapshot = _build_sidebar_attention_summary( + db_manager, + channel_manager, + feed_manager, + p2p_manager, + current_user_id, + ) + out['sidebar_attention_summary'] = attention_snapshot['summary'] + out['sidebar_attention_rev'] = attention_snapshot['rev'] # Admin link and badge (instance owner only) owner_id = db_manager.get_instance_owner_user_id() if owner_id and session.get('user_id') == owner_id: @@ -1571,6 +1581,39 @@ def _build_sidebar_dm_snapshot( 'dm_rev': _stable_ui_revision(contacts), } + def _count_sidebar_message_unread( + db_manager: Any, + user_id: str, + ) -> int: + """Count unread direct and group messages for the global sidebar summary.""" + if not db_manager or not user_id: + return 0 + try: + with db_manager.get_connection() as conn: + row = conn.execute( + """ + SELECT COUNT(DISTINCT m.id) AS unread_count + FROM messages m + WHERE m.read_at IS NULL + AND COALESCE(m.sender_id, '') != ? + AND ( + m.recipient_id = ? + OR EXISTS ( + SELECT 1 + FROM json_each( + CASE WHEN json_valid(m.metadata) THEN m.metadata ELSE '{}' END, + '$.group_members' + ) gm + WHERE CAST(gm.value AS TEXT) = ? + ) + ) + """, + (user_id, user_id, user_id), + ).fetchone() + return max(0, int((row['unread_count'] if row else 0) or 0)) + except Exception: + return 0 + def _build_channel_sidebar_snapshot( channel_manager: Any, p2p_manager: Any, @@ -1612,6 +1655,44 @@ def _build_channel_sidebar_snapshot( 'rev': _stable_ui_revision(payload), } + def _build_sidebar_attention_summary( + db_manager: Any, + channel_manager: Any, + feed_manager: Any, + p2p_manager: Any, + user_id: str, + ) -> dict[str, Any]: + """Build aggregate unread counts for sidebar navigation badges.""" + messages_unread = _count_sidebar_message_unread(db_manager, user_id) + + channels_unread = 0 + try: + channel_snapshot = _build_channel_sidebar_snapshot(channel_manager, p2p_manager, user_id) + channels_unread = sum( + max(0, int((entry.get('unread_count') or 0))) + for entry in (channel_snapshot.get('channels') or []) + ) + except Exception: + channels_unread = 0 + + feed_unread = 0 + try: + if feed_manager and hasattr(feed_manager, 'count_unread_posts'): + feed_unread = max(0, int(feed_manager.count_unread_posts(user_id) or 0)) + except Exception: + feed_unread = 0 + + summary = { + 'messages': messages_unread, + 'channels': channels_unread, + 'feed': feed_unread, + 'total': messages_unread + channels_unread + feed_unread, + } + return { + 'summary': summary, + 'rev': _stable_ui_revision(summary), + } + def _build_sidebar_dm_contacts( db_manager: Any, profile_manager: Any, @@ -3122,6 +3203,11 @@ def feed(): try: db_manager, _, _, _, _, file_manager, feed_manager, interaction_manager, profile_manager, config, p2p_manager = _get_app_components_any(current_app) user_id = get_current_user() + if feed_manager and hasattr(feed_manager, 'mark_feed_viewed'): + try: + feed_manager.mark_feed_viewed(user_id) + except Exception: + pass # Get query parameters algorithm = request.args.get('algorithm', 'chronological') @@ -4532,6 +4618,37 @@ def ajax_sidebar_dm_snapshot(): logger.error(f"Sidebar DM snapshot error: {e}", exc_info=True) return jsonify({'success': False, 'error': 'Failed to load DM sidebar snapshot'}), 500 + @ui.route('/ajax/sidebar_attention_summary', methods=['GET']) + @require_login + def ajax_sidebar_attention_summary(): + """Return aggregate unread counts for sidebar navigation badges.""" + try: + db_manager, _, _, _, channel_manager, _, feed_manager, _, _, _, p2p_manager = _get_app_components_any(current_app) + client_rev = str(request.args.get('rev') or '').strip() + snapshot = _build_sidebar_attention_summary( + db_manager, + channel_manager, + feed_manager, + p2p_manager, + get_current_user(), + ) + changed = snapshot['rev'] != client_rev + return jsonify({ + 'success': True, + 'changed': changed, + 'rev': snapshot['rev'], + 'summary': snapshot['summary'] if changed else {}, + }) + except Exception as e: + logger.error(f"Sidebar attention summary error: {e}", exc_info=True) + return jsonify({ + 'success': False, + 'changed': False, + 'rev': '', + 'summary': {'messages': 0, 'channels': 0, 'feed': 0, 'total': 0}, + 'error': 'Failed to load sidebar attention summary', + }), 500 + @ui.route('/ajax/p2p/diagnostics', methods=['GET']) @require_login def ajax_p2p_diagnostics(): @@ -9852,6 +9969,7 @@ def _user_display(uid: str) -> Optional[dict[str, Any]]: limit = int(request.args.get('limit', 50)) before_message_id = request.args.get('before') + focus_message_id = str(request.args.get('focus_message') or '').strip() logger.debug(f"Get channel messages request: user_id={user_id}, channel_id={channel_id}, limit={limit}") @@ -9895,6 +10013,34 @@ def _user_display(uid: str) -> Optional[dict[str, Any]]: messages = channel_manager.get_channel_messages( channel_id, user_id, limit, before_message_id ) + focus_context_mode = 'recent' + focus_message_found = not focus_message_id + if focus_message_id: + focus_message_found = any( + str(getattr(message, 'id', '') or '') == focus_message_id + for message in messages + ) + if ( + not focus_message_found + and hasattr(channel_manager, 'get_channel_message_context') + ): + try: + focused_messages = channel_manager.get_channel_message_context( + channel_id=channel_id, + message_id=focus_message_id, + user_id=user_id, + ) + except Exception: + focused_messages = [] + if focused_messages: + messages = focused_messages + focus_context_mode = 'context' + focus_message_found = any( + str(getattr(message, 'id', '') or '') == focus_message_id + for message in messages + ) + else: + focus_context_mode = 'missing' logger.debug(f"Retrieved {len(messages)} messages for channel {channel_id}") @@ -10392,12 +10538,18 @@ def _user_display(uid: str) -> Optional[dict[str, Any]]: logger.warning(f"Skipping message {getattr(message, 'id', '?')} in response: {msg_err}") continue - return jsonify({ + payload = { 'messages': messages_data, 'channel_id': channel_id, 'count': len(messages_data), 'workspace_event_cursor': workspace_event_cursor, - }) + 'focus_message_id': focus_message_id or None, + 'focus_message_found': focus_message_found, + 'focus_context_mode': focus_context_mode if focus_message_id else None, + } + if focus_message_id and not focus_message_found: + payload['warning'] = 'Target message is no longer available in this channel.' + return jsonify(payload) except Exception as e: logger.error(f"Get channel messages error: {e}", exc_info=True) diff --git a/canopy/ui/static/js/canopy-main.js b/canopy/ui/static/js/canopy-main.js index 2980527..912b95d 100644 --- a/canopy/ui/static/js/canopy-main.js +++ b/canopy/ui/static/js/canopy-main.js @@ -343,6 +343,8 @@ const canopyInitialRecentDmContacts = window.CANOPY_VARS ? (window.CANOPY_VARS.recentDmContacts || []) : []; const canopyInitialDmRev = window.CANOPY_VARS ? (window.CANOPY_VARS.dmRev || '') : ''; const canopyInitialDmEventCursor = window.CANOPY_VARS ? Number(window.CANOPY_VARS.dmEventCursor || 0) : 0; + const canopyInitialAttentionSummary = window.CANOPY_VARS ? (window.CANOPY_VARS.attentionSummary || { messages: 0, channels: 0, feed: 0, total: 0 }) : { messages: 0, channels: 0, feed: 0, total: 0 }; + const canopyInitialAttentionRev = window.CANOPY_VARS ? (window.CANOPY_VARS.attentionRev || '') : ''; const canopyLocalPeerId = window.CANOPY_VARS ? String(window.CANOPY_VARS.localPeerId || '').trim() : ''; const SIDEBAR_VISIBLE_PEER_LIMIT = 12; window.canopyPeerProfiles = canopyPeerProfiles || {}; @@ -833,6 +835,9 @@ canopySidebarDmState.contacts = []; } canopyRenderSidebarDmContacts(canopySidebarDmState.contacts); + if (window.requestCanopySidebarAttentionRefresh) { + window.requestCanopySidebarAttentionRefresh({ force: true }).catch(() => {}); + } }; function requestCanopySidebarDmRefresh(options) { @@ -948,6 +953,129 @@ startCanopySidebarDmPolling(); }); + function formatSidebarUnreadCount(count) { + const normalized = Math.max(0, Number(count) || 0); + return normalized > 99 ? '99+' : String(normalized); + } + + function setSidebarNavUnreadBadge(kind, count) { + const badge = document.getElementById(`sidebar-nav-${kind}-badge`); + if (!badge) return; + const normalized = Math.max(0, Number(count) || 0); + if (normalized <= 0) { + badge.hidden = true; + badge.textContent = '0'; + badge.removeAttribute('aria-label'); + return; + } + badge.hidden = false; + badge.textContent = formatSidebarUnreadCount(normalized); + badge.setAttribute('aria-label', `${normalized} unread ${kind}`); + } + + function renderSidebarAttentionSummary(summary) { + const safeSummary = summary && typeof summary === 'object' ? summary : {}; + setSidebarNavUnreadBadge('messages', safeSummary.messages || 0); + setSidebarNavUnreadBadge('channels', safeSummary.channels || 0); + setSidebarNavUnreadBadge('feed', safeSummary.feed || 0); + } + + const canopySidebarAttentionState = { + currentRev: canopyInitialAttentionRev || '', + summary: { + messages: Math.max(0, Number(canopyInitialAttentionSummary.messages || 0)), + channels: Math.max(0, Number(canopyInitialAttentionSummary.channels || 0)), + feed: Math.max(0, Number(canopyInitialAttentionSummary.feed || 0)), + total: Math.max(0, Number(canopyInitialAttentionSummary.total || 0)), + }, + inFlight: false, + queued: false, + pollHandle: null, + }; + + function requestCanopySidebarAttentionRefresh(options) { + const opts = options || {}; + if (canopySidebarAttentionState.inFlight) { + canopySidebarAttentionState.queued = true; + return Promise.resolve({ queued: true }); + } + + canopySidebarAttentionState.inFlight = true; + const routes = (window.CANOPY_VARS && window.CANOPY_VARS.urls) || {}; + const endpoint = routes.sidebarAttentionSummary || '/ajax/sidebar_attention_summary'; + const query = new URLSearchParams(); + if (!opts.force && canopySidebarAttentionState.currentRev) { + query.set('rev', String(canopySidebarAttentionState.currentRev || '')); + } + + return fetch(`${endpoint}${query.toString() ? `?${query.toString()}` : ''}`, { + headers: { 'X-Requested-With': 'XMLHttpRequest' } + }) + .then((res) => { + if (!res.ok) { + throw new Error(`Sidebar attention summary failed (${res.status})`); + } + return res.json(); + }) + .then((data) => { + if (!data || data.success === false) return data || null; + if (data.rev) { + canopySidebarAttentionState.currentRev = String(data.rev || ''); + } + if (data.changed === false) { + return data; + } + const summary = data.summary && typeof data.summary === 'object' ? data.summary : {}; + canopySidebarAttentionState.summary = { + messages: Math.max(0, Number(summary.messages || 0)), + channels: Math.max(0, Number(summary.channels || 0)), + feed: Math.max(0, Number(summary.feed || 0)), + total: Math.max(0, Number(summary.total || 0)), + }; + renderSidebarAttentionSummary(canopySidebarAttentionState.summary); + return data; + }) + .catch(() => null) + .finally(() => { + canopySidebarAttentionState.inFlight = false; + if (canopySidebarAttentionState.queued) { + canopySidebarAttentionState.queued = false; + window.setTimeout(() => { + requestCanopySidebarAttentionRefresh({ force: false }).catch(() => {}); + }, 0); + } + }); + } + + function startCanopySidebarAttentionPolling() { + const hasAnyBadge = document.getElementById('sidebar-nav-messages-badge') + || document.getElementById('sidebar-nav-channels-badge') + || document.getElementById('sidebar-nav-feed-badge'); + if (!hasAnyBadge) return; + renderSidebarAttentionSummary(canopySidebarAttentionState.summary); + if (canopySidebarAttentionState.pollHandle) { + window.clearInterval(canopySidebarAttentionState.pollHandle); + } + requestCanopySidebarAttentionRefresh({ force: false }).catch(() => {}); + canopySidebarAttentionState.pollHandle = window.setInterval(() => { + requestCanopySidebarAttentionRefresh({ force: false }).catch(() => {}); + }, 12000); + document.addEventListener('visibilitychange', function() { + if (document.visibilityState === 'visible') { + requestCanopySidebarAttentionRefresh({ force: false }).catch(() => {}); + } + }); + window.addEventListener('focus', function() { + requestCanopySidebarAttentionRefresh({ force: false }).catch(() => {}); + }); + } + + window.requestCanopySidebarAttentionRefresh = requestCanopySidebarAttentionRefresh; + + document.addEventListener('DOMContentLoaded', function() { + startCanopySidebarAttentionPolling(); + }); + window.renderAvatarStack = function(container, options) { if (!container || !options) return; const userLabel = options.userLabel || options.userId || 'User'; @@ -4241,7 +4369,7 @@ let notificationCount = 0; let events = []; const seenEventIds = new Set(); - const mentionRefs = new Set(); + const unreadSemanticKeys = new Set(); let initialized = false; const localUserId = (window.CANOPY_VARS && window.CANOPY_VARS.localUserId) || null; const routes = { @@ -4369,6 +4497,47 @@ return null; } + function activitySemanticKey(evt) { + if (!evt) return null; + const ref = evt.ref || {}; + const kind = String(evt.kind || '').trim(); + if (ref.message_id) { + if (kind === 'direct_message') return `dm:${ref.message_id}`; + return `msg:${ref.message_id}`; + } + if (ref.post_id) return `post:${ref.post_id}`; + if (kind === 'channel_added' && ref.channel_id && ref.user_id) { + return `channel_added:${ref.channel_id}:${ref.user_id}`; + } + if (kind === 'interaction' && ref.item_type && ref.item_id) { + return `interaction:${ref.item_type}:${ref.item_id}:${ref.action || ''}:${ref.user_id || ''}`; + } + return evt.id || `${evt.peer_id || ''}:${kind}:${evt.timestamp || ''}`; + } + + function activityPriority(evt) { + const kind = String(evt && evt.kind || '').trim(); + if (kind === 'mention') return 50; + if (kind === 'direct_message') return 45; + if (kind === 'channel_added') return 40; + if (kind === 'interaction') return 30; + if (kind === 'channel_message') return 20; + if (kind === 'feed_post') return 10; + return 0; + } + + function mergeActivityEvent(existingEvt, incomingEvt) { + if (!existingEvt) return incomingEvt; + if (!incomingEvt) return existingEvt; + const existingPriority = activityPriority(existingEvt); + const incomingPriority = activityPriority(incomingEvt); + if (incomingPriority > existingPriority) return incomingEvt; + if (incomingPriority < existingPriority) return existingEvt; + const existingTs = Number(existingEvt.timestamp || 0); + const incomingTs = Number(incomingEvt.timestamp || 0); + return incomingTs >= existingTs ? incomingEvt : existingEvt; + } + function navigateToActivity(evt) { if (!evt) return; const kind = evt.kind || ''; @@ -4376,10 +4545,13 @@ try { if (kind === 'mention') { + if (ref.message_id) { + window.location.href = `/channels/locate?message_id=${encodeURIComponent(ref.message_id)}`; + return; + } if (ref.channel_id) { const url = new URL(routes.channels, window.location.origin); url.searchParams.set('focus_channel', ref.channel_id); - if (ref.message_id) url.searchParams.set('focus_message', ref.message_id); window.location.href = url.toString(); return; } @@ -4396,17 +4568,23 @@ window.location.href = url.toString(); return; } - if (kind === 'channel_message' && ref.channel_id) { - const url = new URL(routes.channels, window.location.origin); - url.searchParams.set('focus_channel', ref.channel_id); - if (ref.message_id) url.searchParams.set('focus_message', ref.message_id); - window.location.href = url.toString(); - return; + if (kind === 'channel_message') { + if (ref.message_id) { + window.location.href = `/channels/locate?message_id=${encodeURIComponent(ref.message_id)}`; + return; + } + if (ref.channel_id) { + const url = new URL(routes.channels, window.location.origin); + url.searchParams.set('focus_channel', ref.channel_id); + window.location.href = url.toString(); + return; + } } if (kind === 'direct_message') { const otherUserId = otherUserIdFromDirectMessage(ref); const url = new URL(routes.messages, window.location.origin); if (otherUserId) url.searchParams.set('with', otherUserId); + if (ref.message_id) url.hash = `message-${ref.message_id}`; window.location.href = url.toString(); return; } @@ -4580,18 +4758,24 @@ } function recordEvent(evt) { - const refKey = eventRefKey(evt); - if (evt.kind === 'mention' && refKey) { - mentionRefs.add(refKey); - events = events.filter(e => eventRefKey(e) !== refKey || e.kind === 'mention'); - } else if (refKey && mentionRefs.has(refKey)) { - return; - } + const semanticKey = activitySemanticKey(evt); + if (!semanticKey) return; const uid = getUserRefId(evt); if (uid) scheduleUserInfoFetch([uid]); - notificationCount += 1; - events.unshift(evt); + + const existingIndex = events.findIndex((existingEvt) => activitySemanticKey(existingEvt) === semanticKey); + if (existingIndex >= 0) { + const merged = mergeActivityEvent(events[existingIndex], evt); + events.splice(existingIndex, 1); + events.unshift(merged); + } else { + events.unshift(evt); + } events = events.slice(0, 12); + if (!unreadSemanticKeys.has(semanticKey)) { + unreadSemanticKeys.add(semanticKey); + notificationCount += 1; + } setBadge(notificationCount); renderMenu(); markPeerActive(evt.peer_id); @@ -4646,13 +4830,14 @@ bellBtn.addEventListener('click', () => { // Opening the bell acknowledges current count, but keeps the history in the menu. notificationCount = 0; + unreadSemanticKeys.clear(); setBadge(0); }); if (clearBtn) { clearBtn.addEventListener('click', () => { events = []; - mentionRefs.clear(); + unreadSemanticKeys.clear(); notificationCount = 0; setBadge(0); renderMenu(); diff --git a/canopy/ui/templates/base.html b/canopy/ui/templates/base.html index 3b8633c..4f5c7b2 100644 --- a/canopy/ui/templates/base.html +++ b/canopy/ui/templates/base.html @@ -295,6 +295,52 @@ text-align: center; } + .sidebar .nav-link.nav-link-unread { + display: flex; + align-items: center; + justify-content: space-between; + gap: 0.75rem; + } + + .sidebar .nav-link.nav-link-unread i { + margin-right: 0; + flex-shrink: 0; + } + + .sidebar-nav-label { + display: inline-flex; + align-items: center; + gap: 12px; + min-width: 0; + } + + .sidebar-nav-text { + min-width: 0; + } + + .sidebar-nav-badge { + display: inline-flex; + align-items: center; + justify-content: center; + min-width: 1.5rem; + height: 1.5rem; + padding: 0 0.45rem; + border-radius: 999px; + background: linear-gradient(135deg, #ef4444 0%, #dc2626 100%); + color: #fff; + font-size: 0.72rem; + font-weight: 700; + line-height: 1; + letter-spacing: 0.01em; + box-shadow: 0 8px 18px rgba(239, 68, 68, 0.28); + border: 1px solid rgba(255, 255, 255, 0.18); + flex-shrink: 0; + } + + .sidebar-nav-badge[hidden] { + display: none !important; + } + .sidebar-peers { margin: 8px 12px 8px; padding: 10px; @@ -1157,16 +1203,30 @@ position: relative; } - .sidebar.collapsed .nav-link span { + .sidebar.collapsed .nav-link .sidebar-nav-text { opacity: 0; visibility: hidden; transition: all 0.2s ease; } .sidebar.collapsed .nav-link i { - margin-right: 0; font-size: 1.2rem; } + + .sidebar.collapsed .nav-link.nav-link-unread { + justify-content: center; + } + + .sidebar.collapsed .sidebar-nav-badge { + position: absolute; + top: 7px; + right: 7px; + min-width: 1.3rem; + height: 1.3rem; + padding: 0 0.3rem; + font-size: 0.68rem; + box-shadow: 0 6px 16px rgba(239, 68, 68, 0.25); + } .sidebar.collapsed .sidebar-divider, .sidebar.collapsed .sidebar-info { @@ -5281,17 +5341,41 @@