From df0367a90826978b8b2e51d76583620fcdfcd18f Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Thu, 4 Dec 2025 02:10:04 -0600 Subject: [PATCH 1/8] start yt rewrite --- .vscode/settings.json | 4 +- Cargo.toml | 28 +- clippy.toml | 1 + examples/youtube.rs | 59 +++- src/lib.rs | 10 +- src/multicast.rs | 36 +- src/twitch/event.rs | 28 +- src/youtube/client.rs | 232 ++++++++++++ src/youtube/context.rs | 332 ++++++------------ src/youtube/error.rs | 66 ---- src/youtube/mod.rs | 546 +++++++++++++++-------------- src/youtube/signaler.rs | 206 +++++++---- src/youtube/types/browse.rs | 118 +++++++ src/youtube/types/get_live_chat.rs | 234 +++++-------- src/youtube/types/mod.rs | 158 ++++++--- src/youtube/types/streams_page.rs | 117 ------- src/youtube/types/video.rs | 46 +++ src/youtube/util.rs | 201 ++++++++++- 18 files changed, 1415 insertions(+), 1007 deletions(-) create mode 100644 clippy.toml create mode 100644 src/youtube/client.rs delete mode 100644 src/youtube/error.rs create mode 100644 src/youtube/types/browse.rs delete mode 100644 src/youtube/types/streams_page.rs create mode 100644 src/youtube/types/video.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 9494113..cf57bd7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,6 @@ { "rust-analyzer.check.features": ["tls-native", "serde"], - "rust-analyzer.cargo.features": ["tls-native", "serde"] + "rust-analyzer.cargo.features": ["tls-native", "serde"], + "rust-analyzer.check.command": "clippy", + "rust-analyzer.diagnostics.experimental.enable": true, } diff --git a/Cargo.toml b/Cargo.toml index 577939d..3d1e396 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,35 +6,33 @@ license = "Apache-2.0" keywords = [ "chat", "twitch", "youtube", "live" ] authors = [ "Carson M. " ] repository = "https://github.com/pykeio/brainrot" -edition = "2021" -rust-version = "1.75" +edition = "2024" +rust-version = "1.89" [dependencies] -tracing = "0.1" +tracing = { version = "0.1", default-features = false, features = [ "std" ] } irc = { version = "1.0", optional = true, default-features = false } tokio = { version = "1.42", default-features = false, features = [ "net" ] } -futures-util = { version = "0.3", default-features = false } +futures-util = { version = "0.3", default-features = false, features = [ "std" ] } thiserror = "2.0" -chrono = { version = "0.4", default-features = false, features = [ "clock", "std" ] } serde = { version = "1.0", optional = true, features = [ "derive" ] } -serde-aux = { version = "4.4", optional = true } uuid = { version = "1.11", optional = true } -reqwest = { version = "0.12", default-features = false, optional = true, features = [ "charset", "http2" ] } -simd-json = { version = "0.17", optional = true } -url = { version = "2.5", optional = true } -rand = { version = "0.9", optional = true } -regex = { version = "1.11", optional = true } +simd-json = { version = "0.17", default-features = false, optional = true, features = [ "serde_impl" ] } +http = { version = "1.0", optional = true } +bytes = { version = "1.2", default-features = false, optional = true } +fastrand = { version = "2.3", optional = true } async-stream-lite = "0.2" pin-project-lite = "0.2" [dev-dependencies] anyhow = "1.0" tokio = { version = "1.42", features = [ "rt", "rt-multi-thread", "macros", "net" ] } +reqwest = "0.12" [features] default = [ "tls-native", "twitch", "youtube" ] twitch = [ "dep:irc", "dep:uuid" ] -youtube = [ "dep:simd-json", "dep:reqwest", "dep:rand", "dep:serde", "dep:url", "dep:regex", "dep:serde-aux" ] -serde = [ "dep:serde", "chrono/serde", "uuid?/serde" ] -tls-native = [ "irc?/tls-native", "reqwest?/native-tls" ] -tls-rust = [ "irc?/tls-rust", "reqwest?/rustls-tls" ] +youtube = [ "dep:simd-json", "dep:http", "dep:bytes", "dep:fastrand", "dep:serde" ] +serde = [ "dep:serde", "uuid?/serde" ] +tls-native = [ "irc?/tls-native" ] +tls-rust = [ "irc?/tls-rust" ] diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 0000000..f2c7fb1 --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +msrv = "1.89" diff --git a/examples/youtube.rs b/examples/youtube.rs index 8e1162c..ab7c411 100644 --- a/examples/youtube.rs +++ b/examples/youtube.rs @@ -14,26 +14,55 @@ use std::env::args; -use brainrot::youtube::{self, Action, ChatItem}; +use brainrot::youtube::{self, ChatEvent, RequestExecutor, Response, StreamChatMode, StreamContext}; use futures_util::StreamExt; +#[derive(Debug, Default)] +struct ReqwestExecutor(reqwest::Client); + +impl RequestExecutor for ReqwestExecutor { + type Response = Respownse; + type Error = reqwest::Error; + + async fn make_request(&self, req: http::Request) -> Result { + self.0.execute(req.try_into().unwrap()).await.map(Respownse) + } +} + +#[derive(Debug)] +struct Respownse(reqwest::Response); + +impl Response for Respownse { + type Error = reqwest::Error; + + fn status_code(&self) -> u16 { + self.0.status().as_u16() + } + + async fn recv_chunk(&mut self) -> Result, Self::Error> { + self.0.chunk().await + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { - let context = - youtube::ChatContext::new_from_channel(args().nth(1).as_deref().unwrap_or("@miyukiwei"), youtube::ChannelSearchOptions::LatestLiveOrUpcoming).await?; - let mut stream = youtube::stream(&context).await?; - while let Some(Ok(c)) = stream.next().await { - if let Action::AddChatItem { - item: ChatItem::TextMessage { message_renderer_base, message }, - .. - } = c - { - println!( - "{}: {}", - message_renderer_base.author_name.unwrap_or_default().simple_text, - message.unwrap().runs.into_iter().map(|c| c.to_chat_string()).collect::() - ); + let client = youtube::Client::::default(); + let streams = youtube::query_channel(args().nth(1).as_deref().unwrap_or("@miyukiwei"), &client).await?; + + let context = StreamContext::new(client, streams[0].id(), StreamChatMode::Live).await?; + let mut chat = youtube::Chat::new(context).await?; + + for event in chat.initial_events() { + match event { + ChatEvent::Message { text } => println!("{text}") } } + + while let Some(event) = chat.next().await.transpose()? { + match event { + ChatEvent::Message { text } => println!("{text}") + } + } + Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 82676b3..2ef45ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![warn(clippy::unwrap_used)] + #[cfg(feature = "twitch")] pub mod twitch; #[cfg(feature = "twitch")] @@ -20,9 +22,9 @@ pub use self::twitch::{Chat as TwitchChat, ChatEvent as TwitchChatEvent, Message #[cfg(feature = "youtube")] pub mod youtube; -#[cfg(all(feature = "twitch", feature = "youtube"))] -mod multicast; -#[cfg(all(feature = "twitch", feature = "youtube"))] -pub use self::multicast::{Multicast, MulticastError, VariantChat}; +// #[cfg(all(feature = "twitch", feature = "youtube"))] +// mod multicast; +// #[cfg(all(feature = "twitch", feature = "youtube"))] +// pub use self::multicast::{Multicast, MulticastError, VariantChat}; pub(crate) mod util; diff --git a/src/multicast.rs b/src/multicast.rs index 318efcb..bad1578 100644 --- a/src/multicast.rs +++ b/src/multicast.rs @@ -14,7 +14,7 @@ use std::{pin::Pin, task::Poll}; -use futures_util::{Stream, stream::BoxStream}; +use futures_util::Stream; use pin_project_lite::pin_project; use thiserror::Error; @@ -31,57 +31,57 @@ pub enum MulticastError { #[derive(Debug)] pub enum VariantChat { Twitch(twitch::ChatEvent), - YouTube(youtube::Action) + YouTube(youtube::ChatEvent) } pin_project! { #[project = VariantStreamProject] - enum VariantStream<'a> { + enum VariantStream { Twitch { #[pin] x: crate::twitch::Chat }, - YouTube { #[pin] x: BoxStream<'a, Result> } + YouTube { #[pin] x: crate::youtube::Chat } } } -impl<'a> Stream for VariantStream<'a> { +impl Stream for VariantStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { match self.project() { VariantStreamProject::YouTube { x } => { - Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(|c| VariantChat::YouTube(c)).map_err(MulticastError::YouTubeError))) + Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(VariantChat::YouTube).map_err(MulticastError::YouTubeError))) } VariantStreamProject::Twitch { x } => { - Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(|c| VariantChat::Twitch(c)).map_err(MulticastError::TwitchError))) + Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(VariantChat::Twitch).map_err(MulticastError::TwitchError))) } } } } -impl<'a> From for VariantStream<'a> { - fn from(value: crate::twitch::Chat) -> Self { +impl From for VariantStream { + fn from(value: twitch::Chat) -> Self { Self::Twitch { x: value } } } -impl<'a> From>> for VariantStream<'a> { - fn from(value: BoxStream<'a, Result>) -> Self { +impl From for VariantStream { + fn from(value: youtube::Chat) -> Self { Self::YouTube { x: value } } } pin_project! { - pub struct Multicast<'a> { + pub struct Multicast { #[pin] - streams: Vec> + streams: Vec } } -impl<'a> Multicast<'a> { +impl Multicast { pub fn new() -> Self { Self { streams: vec![] } } - pub fn push<'b: 'a>(&mut self, stream: impl Into>) { + pub fn push(&mut self, stream: impl Into) { self.streams.push(stream.into()); } @@ -90,13 +90,13 @@ impl<'a> Multicast<'a> { Ok(()) } - pub async fn push_youtube<'b: 'a>(&mut self, context: &'b youtube::ChatContext) -> Result<(), youtube::Error> { - self.push(youtube::stream(context).await?); + pub async fn push_youtube(&mut self, context: youtube::StreamContext) -> Result<(), youtube::Error> { + self.push(youtube::Chat::new(context).await?); Ok(()) } } -impl<'a> Stream for Multicast<'a> { +impl Stream for Multicast { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { diff --git a/src/twitch/event.rs b/src/twitch/event.rs index 0a2ff22..cfd84ad 100644 --- a/src/twitch/event.rs +++ b/src/twitch/event.rs @@ -14,10 +14,10 @@ use std::{ collections::HashMap, + fmt, num::{NonZeroU16, NonZeroU32} }; -use chrono::{DateTime, TimeZone, Utc}; use irc::proto::{Command, Response}; use uuid::Uuid; @@ -81,11 +81,11 @@ pub enum MessageSegment { } } -impl ToString for MessageSegment { - fn to_string(&self) -> String { +impl fmt::Display for MessageSegment { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Text { text } => text.to_owned(), - Self::Emote { name, .. } => name.to_owned() + Self::Text { text } => f.write_str(text), + Self::Emote { name, .. } => f.write_str(name) } } } @@ -95,7 +95,7 @@ pub enum ChatEvent { Message { id: Uuid, user: User, - sent_at: DateTime, + sent_at_ms: i64, reply_to: Option, emote_only: bool, first_message: bool, @@ -105,7 +105,7 @@ pub enum ChatEvent { id: Uuid, user: User, bits: NonZeroU32, - sent_at: DateTime, + sent_at_ms: i64, segments: Vec }, MemberChunk { @@ -230,19 +230,23 @@ pub(crate) fn to_chat_event(message: irc::proto::Message) -> Option { }; let id = tags.remove("id").and_then(|f| f.parse().ok())?; - let sent_at = Utc - .timestamp_opt(tags.remove("tmi-sent-ts").and_then(|f| f.parse::().map(|f| f / 1000).ok())?, 0) - .latest()?; + let sent_at = tags.remove("tmi-sent-ts").and_then(|f| f.parse::().ok())?; if let Some(bits) = tags.remove("bits").and_then_nonempty(|f| f.parse().ok()) { - return Some(ChatEvent::SendBits { id, user, bits, sent_at, segments }); + return Some(ChatEvent::SendBits { + id, + user, + bits, + sent_at_ms: sent_at, + segments + }); } Some(ChatEvent::Message { id, user, reply_to: tags.remove("reply-parent-msg-id").and_then(|f| f.parse().ok()), - sent_at, + sent_at_ms: sent_at, emote_only: matches!(tags.remove("emote-only").as_deref(), Some("1")), first_message: matches!(tags.remove("first-msg").as_deref(), Some("1")), contents: segments diff --git a/src/youtube/client.rs b/src/youtube/client.rs new file mode 100644 index 0000000..4f1d84f --- /dev/null +++ b/src/youtube/client.rs @@ -0,0 +1,232 @@ +use std::{error::Error as StdError, fmt, future::Future, sync::OnceLock}; + +use bytes::{Bytes, BytesMut}; +use http::{HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, header, request::Builder as RequestBuilder, uri::PathAndQuery}; + +use crate::youtube::types::{ + self, InnertubeRequest, InnertubeRequestContext, InnertubeRequestContextClient, browse::BrowseRequest, get_live_chat::GetLiveChatRequest, + video::VideoRequest +}; + +pub(crate) const DEFAULT_CLIENT_NAME: &str = "WEB"; +pub(crate) const DEFAULT_CLIENT_VERSION: &str = "2.20250925.01.00"; + +pub trait Response: Send + Sized { + type Error: StdError + Send; + + fn status_code(&self) -> u16; + + fn recv_chunk(&mut self) -> impl Future, Self::Error>> + Send + Sync + '_; + + fn recv_all(mut self) -> impl Future> + Send { + async move { + let mut out = BytesMut::new(); + while let Some(frame) = self.recv_chunk().await? { + out.extend_from_slice(&frame); + } + Ok(out) + } + } +} + +pub(crate) trait ResponseExt: Response + Sized { + fn with_innertube_error(self) -> impl Future>; +} + +impl ResponseExt for T { + async fn with_innertube_error(self) -> Result { + match self.status_code() { + 200 => Ok(self), + status_code => { + let idk = Err(InnertubeError::Unknown { status_code }); + + let Ok(mut x) = self.recv_all().await else { + return idk; + }; + + let Ok(error): Result = simd_json::from_slice(&mut x) else { + return idk; + }; + + Err(InnertubeError::Specific { + status_code, + message: error.message.to_string(), + code: error.status.to_string() + }) + } + } + } +} + +#[derive(Debug)] +pub enum InnertubeError { + Specific { status_code: u16, message: String, code: String }, + Unknown { status_code: u16 } +} + +impl InnertubeError { + #[inline] + pub const fn status_code(&self) -> u16 { + match self { + Self::Specific { status_code, .. } => *status_code, + Self::Unknown { status_code } => *status_code + } + } +} + +impl fmt::Display for InnertubeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Specific { status_code, message, code } => f.write_fmt(format_args!("innertube returned status {status_code}: {message} ({code})")), + Self::Unknown { status_code } => f.write_fmt(format_args!("innertube returned status {status_code} (couldn't decode body)")) + } + } +} + +impl StdError for InnertubeError {} + +pub trait RequestExecutor: Send + Sync + 'static { + type Response: Response; + type Error: StdError + Send; + + fn make_request(&self, req: http::Request) -> impl Future> + Send + Sync + '_; +} + +#[derive(Debug)] +pub enum ClientError { + BadRequest(http::Error), + Serialize(simd_json::Error), + Executor(E) +} + +impl From for ClientError { + fn from(e: simd_json::Error) -> Self { + Self::Serialize(e) + } +} +impl From for ClientError { + fn from(e: http::Error) -> Self { + Self::BadRequest(e) + } +} + +impl fmt::Display for ClientError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Serialize(e) => f.write_fmt(format_args!("body serialization failed: {e}")), + Self::BadRequest(e) => f.write_fmt(format_args!("accidentally built malformed request: {e}")), + Self::Executor(e) => f.write_fmt(format_args!("failed to execute request: {e}")) + } + } +} + +impl StdError for ClientError { + fn cause(&self) -> Option<&dyn StdError> { + match self { + Self::Serialize(e) => Some(e), + Self::BadRequest(e) => Some(e), + Self::Executor(e) => Some(e) + } + } +} + +#[derive(Debug, Clone)] +pub struct Client { + http_client: E, + default_headers: HeaderMap, + innertube_client: InnertubeRequestContextClient<'static> +} + +macro_rules! endpoint { + ($name:ident($body:ty), $path:expr) => { + #[inline] + pub(crate) async fn $name(&self, body: $body) -> Result> { + static ENDPOINT: OnceLock = OnceLock::new(); + + #[cold] + fn url_factory() -> Uri { + Uri::builder() + .scheme("https") + .authority("www.youtube.com") + .path_and_query(PathAndQuery::from_static(concat!("/youtubei/v1", $path, "?prettyPrint=false"))) + .build() + .expect("invalid endpoint URI") + } + + let body = simd_json::to_vec(&InnertubeRequest::new(self, body))?; + let request = self + .base_request(ENDPOINT.get_or_init(url_factory).clone()) + .method(Method::POST) + .header(header::CONTENT_TYPE, HeaderValue::from_static("application/json")) + .body(body.into())?; + self.execute(request).await + } + }; +} + +impl Client { + pub fn new(executor: E) -> Self { + Self::new_with_context( + executor, + InnertubeRequestContextClient { + client_name: DEFAULT_CLIENT_NAME, + client_version: DEFAULT_CLIENT_VERSION + } + ) + } + + pub fn new_with_context(executor: E, context: InnertubeRequestContextClient<'static>) -> Self { + let mut headers = HeaderMap::new(); + headers.append( + HeaderName::from_static("x-youtube-client-name"), + match context.client_name { + "WEB" => HeaderValue::from_static("1"), + x => unimplemented!("Unknown client name '{x}'") + } + ); + headers.append(HeaderName::from_static("x-youtube-client-version"), HeaderValue::from_str(context.client_version).expect("Invalid client version")); + + // Set our Accept-Language to en-US so we can properly match substrings + headers.append(header::ACCEPT_LANGUAGE, HeaderValue::from_static("en-US,en;q=0.5")); + headers.append(header::USER_AGENT, HeaderValue::from_static("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0")); + // Referer is required by Signaler endpoints. + headers.append(header::REFERER, HeaderValue::from_static("https://www.youtube.com/")); + + Self { + http_client: executor, + default_headers: headers, + innertube_client: context + } + } + + #[inline] + pub(crate) fn request_context(&self) -> InnertubeRequestContext<'_> { + InnertubeRequestContext { + client: self.innertube_client.clone() + } + } + + pub(crate) fn base_request(&self, uri: Uri) -> RequestBuilder { + let mut request = Request::builder().uri(uri); + for (name, value) in self.default_headers.iter() { + request = request.header(name, value); + } + request + } + + pub(crate) async fn execute(&self, request: http::Request) -> Result> { + self.http_client.make_request(request).await.map_err(ClientError::Executor) + } + + endpoint!(browse(BrowseRequest<'_>), "/browse"); + endpoint!(video(VideoRequest<'_>), "/next"); + endpoint!(chat_live(GetLiveChatRequest<'_>), "/live_chat/get_live_chat"); + endpoint!(chat_replay(GetLiveChatRequest<'_>), "/live_chat/get_live_chat_replay"); +} + +impl Default for Client { + #[inline] + fn default() -> Self { + Client::new(E::default()) + } +} diff --git a/src/youtube/context.rs b/src/youtube/context.rs index bab6745..8cb5a9f 100644 --- a/src/youtube/context.rs +++ b/src/youtube/context.rs @@ -12,260 +12,140 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::OnceLock; +use std::{error::Error as StdError, fmt}; -use regex::Regex; -use url::Url; - -use super::{ - Error, get_http_client, - types::streams_page::{ - FeedContentsRenderer, PageContentsRenderer, RichGridItem, RichItemContent, TabItemRenderer, ThumbnailOverlay, VideoTimeStatus, YouTubeInitialData - } +use crate::youtube::{ + ClientError, + client::{Client, InnertubeError, RequestExecutor, Response, ResponseExt}, + types::video::{ContinuationData, ConversationBar, VideoRequest, VideoResponse, VideoResponseContents} }; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum LiveStreamStatus { - Upcoming, - Live, - Replay -} - -impl LiveStreamStatus { - #[inline] - pub fn updates_live(&self) -> bool { - matches!(self, LiveStreamStatus::Upcoming | LiveStreamStatus::Live) - } -} - #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] -pub enum ChannelSearchOptions { - /// Get the live chat of the latest live stream, or the pre-stream chat of the latest upcoming stream if no stream - /// is currently live. - LatestLiveOrUpcoming, - /// Get the live chat of the first live stream, or the pre-stream chat of the first upcoming stream if no stream - /// is currently live. +pub enum StreamChatMode { + /// Top chat, filtering out potential spam #[default] - FirstLiveOrUpcoming, - /// Get the live chat of the first live stream. - FirstLive, - /// Get the live chat of the latest live stream. - LatestLive + Top, + /// Live chat with minimal filtering + Live } -#[derive(Clone, Debug)] -pub struct ChatContext { - pub(crate) id: String, - pub(crate) api_key: String, - pub(crate) client_version: String, +#[derive(Debug)] +pub struct StreamContext { + pub(crate) client: Client, pub(crate) initial_continuation: String, - pub(crate) tango_api_key: Option, - pub(crate) live_status: LiveStreamStatus + pub(crate) is_replay: bool } -impl ChatContext { - pub async fn new_from_channel(channel_id: impl AsRef, options: ChannelSearchOptions) -> Result { - let channel_id = channel_id.as_ref(); - let channel_id = if channel_id.starts_with("UC") || channel_id.starts_with('@') { - channel_id - } else { - Self::parse_channel_link(channel_id).ok_or_else(|| Error::InvalidChannelID(channel_id.to_string()))? - }; - let page_contents = get_http_client() - .get(if channel_id.starts_with('@') { - format!("https://www.youtube.com/{channel_id}/streams") - } else { - format!("https://www.youtube.com/channel/{channel_id}/streams") - }) - .send() - .await? - .text() - .await?; - - static YT_INITIAL_DATA_REGEX: OnceLock = OnceLock::new(); - let yt_initial_data: YouTubeInitialData = unsafe { - simd_json::from_str( - &mut YT_INITIAL_DATA_REGEX - .get_or_init(|| Regex::new(r#"var ytInitialData\s*=\s*(\{.+?\});"#).unwrap()) - .captures(&page_contents) - .ok_or_else(|| Error::NoChatContinuation)? - .get(1) - .ok_or(Error::MissingInitialData)? - .as_str() - .to_owned() - ) - }?; - - let mut live_id = None; - match yt_initial_data.contents { - PageContentsRenderer::TwoColumnBrowseResultsRenderer { tabs } => match tabs - .iter() - .find(|c| match c { - TabItemRenderer::TabRenderer { title, content, .. } => content.is_some() && title == "Live", - TabItemRenderer::ExpandableTabRenderer { .. } => false - }) - .ok_or_else(|| Error::NoMatchingStream(channel_id.to_string()))? - { - TabItemRenderer::TabRenderer { content, .. } => match content.as_ref().unwrap() { - FeedContentsRenderer::RichGridRenderer { contents } => { - let finder = |c: &&RichGridItem| match c { - RichGridItem::RichItemRenderer { content, .. } => match content { - RichItemContent::VideoRenderer { thumbnail_overlays, video_id, .. } => thumbnail_overlays.iter().any(|c| match c { - ThumbnailOverlay::TimeStatus { style, .. } => { - if *style == VideoTimeStatus::Live { - live_id = Some((video_id.to_owned(), true)); - true - } else { - if *style == VideoTimeStatus::Upcoming - && matches!(options, ChannelSearchOptions::FirstLiveOrUpcoming | ChannelSearchOptions::LatestLiveOrUpcoming) - { - match &live_id { - None => { - live_id = Some((video_id.to_owned(), false)); - } - Some((_, false)) => { - live_id = Some((video_id.to_owned(), false)); - } - Some((_, true)) => {} - } - } - false - } - } - _ => false - }) - }, - RichGridItem::ContinuationItemRenderer { .. } => false - }; - if matches!(options, ChannelSearchOptions::FirstLive | ChannelSearchOptions::FirstLiveOrUpcoming) { - contents.iter().rev().find(finder) - } else { - contents.iter().find(finder) - } - .ok_or_else(|| Error::NoMatchingStream(channel_id.to_string()))? - } - _ => return Err(Error::NoMatchingStream(channel_id.to_string())) - }, - TabItemRenderer::ExpandableTabRenderer { .. } => unreachable!() - } - }; - - ChatContext::new_from_live(live_id.ok_or_else(|| Error::NoMatchingStream(channel_id.to_string()))?.0).await - } - - pub async fn new_from_live(id: impl AsRef) -> Result { +impl StreamContext { + pub async fn new(client: Client, id: impl AsRef, mode: StreamChatMode) -> Result> { let id = id.as_ref(); - let live_id = if id.is_ascii() && id.len() == 11 { - id - } else { - Self::parse_stream_link(id).ok_or_else(|| Error::InvalidVideoID(id.to_string()))? - }; - let page_contents = get_http_client() - .get(format!("https://www.youtube.com/watch?v={live_id}")) - .send() - .await? - .text() - .await?; + if !id.is_ascii() || id.len() != 11 { + return Err(StreamContextError::InvalidVideoID); + } - static LIVE_STREAM_REGEX: OnceLock = OnceLock::new(); - let live_status = if LIVE_STREAM_REGEX - .get_or_init(|| Regex::new(r#"['"]isLiveContent['"]:\s*(true)"#).unwrap()) - .find(&page_contents) - .is_some() - { - static LIVE_NOW_REGEX: OnceLock = OnceLock::new(); - static REPLAY_REGEX: OnceLock = OnceLock::new(); - if LIVE_NOW_REGEX - .get_or_init(|| Regex::new(r#"['"]isLiveNow['"]:\s*(true)"#).unwrap()) - .find(&page_contents) - .is_some() - { - LiveStreamStatus::Live - } else if REPLAY_REGEX - .get_or_init(|| Regex::new(r#"['"]isReplay['"]:\s*(true)"#).unwrap()) - .find(&page_contents) - .is_some() - { - LiveStreamStatus::Replay - } else { - LiveStreamStatus::Upcoming - } - } else { - return Err(Error::NotStream(live_id.to_string())); + let mut video_response = client + .video(VideoRequest { video_id: id }) + .await? + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(StreamContextError::Receive)?; + let video_response: VideoResponse<'_> = simd_json::from_slice(&mut video_response)?; + + let VideoResponseContents::TwoColumnWatchNextResults { + conversation_bar: Some(conversation_bar) + } = video_response.contents + else { + return Err(StreamContextError::NoChat); }; - static INNERTUBE_API_KEY_REGEX: OnceLock = OnceLock::new(); - let api_key = match INNERTUBE_API_KEY_REGEX - .get_or_init(|| Regex::new(r#"['"]INNERTUBE_API_KEY['"]:\s*['"](.+?)['"]"#).unwrap()) - .captures(&page_contents) - .and_then(|captures| captures.get(1)) - { - Some(matched) => matched.as_str().to_string(), - None => return Err(Error::NoInnerTubeKey) + let ConversationBar::LiveChatRenderer { continuations, is_replay } = conversation_bar else { + return Err(StreamContextError::NoChat); }; - static TANGO_API_KEY_REGEX: OnceLock = OnceLock::new(); - let tango_api_key = TANGO_API_KEY_REGEX - .get_or_init(|| Regex::new(r#"['"]LIVE_CHAT_BASE_TANGO_CONFIG['"]:\s*\{\s*['"]apiKey['"]\s*:\s*['"](.+?)['"]"#).unwrap()) - .captures(&page_contents) - .and_then(|captures| captures.get(1).map(|c| c.as_str().to_string())); - - static CLIENT_VERSION_REGEX: OnceLock = OnceLock::new(); - let client_version = match CLIENT_VERSION_REGEX - .get_or_init(|| Regex::new(r#"['"]clientVersion['"]:\s*['"]([\d.]+?)['"]"#).unwrap()) - .captures(&page_contents) - .and_then(|captures| captures.get(1)) - { - Some(matched) => matched.as_str().to_string(), - None => "2.20240207.07.00".to_string() + let Some(ContinuationData::ReloadContinuationData { continuation }) = continuations.first() else { + return Err(StreamContextError::NoChat); }; - static CONTINUATION_REGEX: OnceLock = OnceLock::new(); - let continuation_regex = CONTINUATION_REGEX.get_or_init(|| { - Regex::new(r#"['"]continuations['"]:\s*\[\{\s*['"]reloadContinuationData['"]:\s*\{['"]continuation['"]:\s*['"](.+?)['"]"#).unwrap() - }); - let continuation = match continuation_regex.captures(&page_contents).and_then(|captures| captures.get(1)) { - Some(matched) => matched.as_str().to_string(), - None => return Err(Error::NoChatContinuation) - }; + let mut continuation = continuation.to_string(); + + if mode == StreamChatMode::Live { + // All continuation tokens are base64url-encoded protobuf. The byte sequence `08 08 xx 18` is present in all + // of them - `xx` determines whether the top chat or live chat is used, where top chat is `01` and live chat is `04`. + // YT API only provides 01 (top chat) tokens. In lieu of manually building the protobuf messages ourselves + // and pulling in a base64 encoder/decoder, have this extremely fragile and extremely stupid mechanism instead. + if is_replay { + if let Some((index, pat)) = continuation.match_indices("NEQAFyCAgEGAIgAC").next() { + continuation.replace_range(index..index + pat.len(), "NEQAFyCAgBGAIgAC"); + } else { + tracing::warn!("failed to find sentinel in continuation token; top chat will be used instead"); + } + } else if let Some((index, pat)) = continuation.match_indices("RDABggEICAQYAiAAKAC").next() { + continuation.replace_range(index..index + pat.len(), "RDABggEICAEYAiAAKAC"); + } else { + tracing::warn!("failed to find sentinel in continuation token; top chat will be used instead"); + } + } - Ok(ChatContext { - id: live_id.to_string(), - api_key, - client_version, - tango_api_key, + Ok(StreamContext { + client, initial_continuation: continuation, - live_status + is_replay }) } +} - fn parse_stream_link(url: &str) -> Option<&str> { - static LINK_RE: OnceLock = OnceLock::new(); - LINK_RE - .get_or_init(|| Regex::new(r#"(?:https?:\/\/)?(?:www\.)?youtu\.?be(?:\.com)?\/?.*(?:watch|embed)?(?:.*v=|v\/|\/)([A-Za-z0-9-_]+)"#).unwrap()) - .captures(url) - .and_then(|c| c.get(1)) - .map(|c| c.as_str()) - } +#[derive(Debug)] +pub enum StreamContextError { + InvalidVideoID, + NoChat, + Deserialize(simd_json::Error), + Client(ClientError), + Receive(::Error), + Innertube(InnertubeError) +} - fn parse_channel_link(url: &str) -> Option<&str> { - static CHANNEL_RE: OnceLock = OnceLock::new(); - CHANNEL_RE - .get_or_init(|| Regex::new(r#"^(?:https?:\/\/)?(?:www\.)?youtube\.com\/(?:channel\/(UC[\w-]{21}[AQgw])|(@[\w]+))$"#).unwrap()) - .captures(url) - .and_then(|c| c.get(1).or_else(|| c.get(2))) - .map(|c| c.as_str()) +impl From for StreamContextError { + fn from(e: simd_json::Error) -> Self { + Self::Deserialize(e) } - - pub fn id(&self) -> &str { - &self.id +} +impl From> for StreamContextError { + fn from(e: ClientError) -> Self { + Self::Client(e) } +} +impl From for StreamContextError { + fn from(e: InnertubeError) -> Self { + Self::Innertube(e) + } +} - pub fn url(&self) -> Url { - Url::parse(&format!("https://www.youtube.com/watch?v={}", self.id)).unwrap() +impl fmt::Display for StreamContextError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidVideoID => f.write_str("invalid video ID"), + Self::NoChat => f.write_str("stream does not have chat enabled"), + Self::Deserialize(e) => f.write_fmt(format_args!("failed to deserialize response: {e}")), + Self::Client(e) => fmt::Display::fmt(e, f), + Self::Receive(e) => f.write_fmt(format_args!("failed to receive response: {e}")), + Self::Innertube(e) => fmt::Display::fmt(e, f) + } } +} - pub fn status(&self) -> LiveStreamStatus { - self.live_status +impl StdError for StreamContextError +where + E::Response: fmt::Debug +{ + fn cause(&self) -> Option<&dyn StdError> { + match self { + Self::Deserialize(e) => Some(e), + Self::Client(e) => Some(e), + Self::Receive(e) => Some(e), + Self::Innertube(e) => Some(e), + _ => None + } } } diff --git a/src/youtube/error.rs b/src/youtube/error.rs deleted file mode 100644 index dbfb3e2..0000000 --- a/src/youtube/error.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2024 pyke.io -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use reqwest::StatusCode; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum Error { - #[error("Invalid YouTube video ID or URL: {0}")] - InvalidVideoID(String), - #[error("Invalid YouTube channel ID or URL: {0}")] - InvalidChannelID(String), - #[error("Channel {0} has no live stream matching the options criteria")] - NoMatchingStream(String), - #[error("Missing `ytInitialData` structure from channel streams page.")] - MissingInitialData, - #[error("error when deserializing: {0}")] - Deserialization(#[from] simd_json::Error), - #[error("missing continuation contents")] - MissingContinuationContents, - #[error("reached end of continuation")] - EndOfContinuation, - #[error("request timed out")] - TimedOut, - #[error("request returned bad HTTP status: {0}")] - BadStatus(StatusCode), - #[error("request error: {0}")] - GeneralRequest(reqwest::Error), - #[error("{0} is not a live stream")] - NotStream(String), - #[error("Failed to match InnerTube API key")] - NoInnerTubeKey, - #[error("Chat continuation token could not be found.")] - NoChatContinuation, - #[error("Error parsing URL: {0}")] - URLParseError(#[from] url::ParseError) -} - -impl Error { - pub fn is_fatal(&self) -> bool { - !matches!(self, Error::TimedOut) - } -} - -impl From for Error { - fn from(value: reqwest::Error) -> Self { - if value.is_timeout() { - Error::TimedOut - } else if value.is_status() { - Error::BadStatus(value.status().unwrap()) - } else { - Error::GeneralRequest(value) - } - } -} diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index f42e312..77e9325 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -12,310 +12,344 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashSet, io::BufRead, sync::OnceLock, time::Duration}; +use std::{error::Error as StdError, fmt, io::BufRead, pin::Pin, task::Poll}; use async_stream_lite::try_async_stream; -use futures_util::stream::BoxStream; -use reqwest::header::{self, HeaderMap, HeaderValue}; +use futures_util::{Stream, stream::BoxStream}; +use pin_project_lite::pin_project; use simd_json::base::{ValueAsArray, ValueAsScalar}; -use thiserror::Error; -use tokio::time::sleep; +mod client; mod context; -mod error; mod signaler; mod types; mod util; +use self::{ + client::ResponseExt, + signaler::SignalerChannel, + types::get_live_chat::{Continuation, GetLiveChatRequest, GetLiveChatResponse}, + util::{TANGO_API_KEY, stringify_runs} +}; pub use self::{ - context::{ChannelSearchOptions, ChatContext, LiveStreamStatus}, - error::Error, + client::{Client, ClientError, InnertubeError, RequestExecutor, Response}, + context::{StreamChatMode, StreamContext}, types::{ ImageContainer, LocalizedRun, LocalizedText, Thumbnail, UnlocalizedText, get_live_chat::{Action, ChatItem, MessageRendererBase} - } -}; -use self::{ - signaler::SignalerChannelInner, - types::get_live_chat::{Continuation, GetLiveChatResponse} + }, + util::query_channel }; +use crate::youtube::signaler::SignalerError; -const TANGO_LIVE_ENDPOINT: &str = "https://www.youtube.com/youtubei/v1/live_chat/get_live_chat"; -const TANGO_REPLAY_ENDPOINT: &str = "https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay"; - -pub(crate) fn get_http_client() -> &'static reqwest::Client { - static HTTP_CLIENT: OnceLock = OnceLock::new(); - HTTP_CLIENT.get_or_init(|| { - let mut headers = HeaderMap::new(); - // Set our Accept-Language to en-US so we can properly match substrings - headers.append(header::ACCEPT_LANGUAGE, HeaderValue::from_static("en-US,en;q=0.5")); - headers.append(header::USER_AGENT, HeaderValue::from_static("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0")); - // Referer is required by Signaler endpoints. - headers.append(header::REFERER, HeaderValue::from_static("https://www.youtube.com/")); - reqwest::Client::builder().default_headers(headers).build().unwrap() - }) +#[derive(Debug)] +pub enum ChatEvent { + Message { text: String } } -struct ActionChunk<'r> { - actions: Vec, - ctx: &'r ChatContext, - continuation_token: Option, - pub(crate) signaler_topic: Option +impl ChatEvent { + pub(crate) fn from_action(action: &Action<'_>) -> Option { + match action { + Action::AddChatItem { item, .. } => match item { + ChatItem::TextMessage { message_renderer_base, message } => message.as_ref().map(|x| ChatEvent::Message { text: stringify_runs(&x.runs) }), + _ => None + }, + Action::ReplayChat { .. } => unreachable!("ReplayChat should be collapsed"), + _ => None + } + } } -unsafe impl<'r> Send for ActionChunk<'r> {} +pin_project! { + pub struct Chat { + initial_events: Vec, + #[pin] + stream: BoxStream<'static, Result>> + } +} -impl<'r> ActionChunk<'r> { - pub fn new(response: GetLiveChatResponse, ctx: &'r ChatContext) -> Result { - let continuation_contents = response.continuation_contents.ok_or(Error::EndOfContinuation)?; +impl Chat { + pub async fn new(context: StreamContext) -> Result> { + let mut initial_continuation_bytes = if !context.is_replay { + context + .client + .chat_live(GetLiveChatRequest { + continuation: &context.initial_continuation + }) + .await? + } else { + context + .client + .chat_replay(GetLiveChatRequest { + continuation: &context.initial_continuation + }) + .await? + } + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(ChatError::Receive)?; + let initial_continuation: GetLiveChatResponse<'_> = simd_json::from_slice(&mut initial_continuation_bytes)?; - let continuation_token = match &continuation_contents.live_chat_continuation.continuations[0] { - Continuation::Invalidation { continuation, .. } => continuation.to_owned(), - Continuation::Timed { continuation, .. } => continuation.to_owned(), - Continuation::Replay { continuation, .. } => continuation.to_owned(), - Continuation::PlayerSeek { .. } => return Err(Error::EndOfContinuation) - }; - let signaler_topic = match &continuation_contents.live_chat_continuation.continuations[0] { - Continuation::Invalidation { invalidation_id, .. } => Some(invalidation_id.topic.to_owned()), - _ => None + let Some(contents) = initial_continuation.continuation_contents else { + return Err(ChatError::NoChat); }; - Ok(Self { - actions: if ctx.live_status.updates_live() { - continuation_contents + + match &contents.live_chat_continuation.continuations[0] { + Continuation::Invalidation { invalidation_id, continuation, .. } => { + let continuation_token = continuation.to_string(); + + let mut channel = SignalerChannel::with_topic(invalidation_id.topic, TANGO_API_KEY); + + let initial_events = contents .live_chat_continuation .actions .unwrap_or_default() .into_iter() - .map(|f| f.action) - .collect() - } else { - continuation_contents - .live_chat_continuation - .actions - .ok_or(Error::EndOfContinuation)? - .into_iter() - .flat_map(|f| match f.action { - Action::ReplayChat { actions, .. } => actions.into_iter().map(|f| f.action).collect(), - f => vec![f] - }) - .collect() - }, - ctx, - continuation_token: Some(continuation_token), - signaler_topic - }) - } + .filter_map(|act| ChatEvent::from_action(&act.action)) + .collect(); + let _ = initial_continuation; + let _ = initial_continuation_bytes; - pub fn iter(&self) -> std::slice::Iter<'_, Action> { - self.actions.iter() - } + Ok(Self { + initial_events, + stream: Box::pin(try_async_stream(move |yielder| async move { + let mut continuation_token = continuation_token; + 'i: loop { + let mut continuation_bytes = context + .client + .chat_live(GetLiveChatRequest { continuation: &continuation_token }) + .await? + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(ChatError::Receive)?; + let continuation: GetLiveChatResponse<'_> = simd_json::from_slice(&mut continuation_bytes)?; + let Some(contents) = continuation.continuation_contents else { + break; + }; - pub async fn cont(&self) -> Option> { - if let Some(continuation_token) = &self.continuation_token { - let page = match GetLiveChatResponse::fetch(self.ctx, continuation_token).await { - Err(e) => return Some(Err(e)), - Ok(page) => page - }; - if page.continuation_contents.is_some() { Some(ActionChunk::new(page, self.ctx)) } else { None } - } else { - None - } - } -} + for event in contents + .live_chat_continuation + .actions + .unwrap_or_default() + .into_iter() + .filter_map(|act| ChatEvent::from_action(&act.action)) + { + yielder.r#yield(event).await; + } -impl<'r> IntoIterator for ActionChunk<'r> { - type Item = Action; - type IntoIter = std::vec::IntoIter; + let Some(Continuation::Invalidation { continuation: next_token, .. }) = contents.live_chat_continuation.continuations.first() + else { + break; + }; - fn into_iter(self) -> Self::IntoIter { - self.actions.into_iter() - } -} + continuation_token.clear(); + continuation_token.push_str(next_token); -pub async fn stream(options: &ChatContext) -> Result>, Error> { - let initial_chat = GetLiveChatResponse::fetch(options, &options.initial_continuation).await?; + let _ = continuation; + let _ = continuation_bytes; - Ok(Box::pin(try_async_stream(|yielder| async move { - let mut seen_messages = HashSet::new(); + let mut req = { + channel.reset(); + channel.choose_server(&context.client).await?; + channel.init_session(&context.client).await?; + channel.get_session_stream(&context.client).await? + }; + loop { + match req.recv_chunk().await { + Ok(Some(s)) => { + let Some(mut ofs_res_line) = s.lines().nth(1).transpose().expect("infallible") else { + break; + }; - match &initial_chat.continuation_contents.as_ref().unwrap().live_chat_continuation.continuations[0] { - Continuation::Invalidation { invalidation_id, .. } => { - let topic = invalidation_id.topic.to_owned(); + if let Ok(s) = unsafe { simd_json::from_str::(ofs_res_line.as_mut()) } { + if let Some(a) = s.as_array() { + // channel.aid = a[a.len() - 1].as_array().unwrap()[0].as_usize().unwrap(); + if let Some(aid) = a.last().and_then(|x| x.as_array()).and_then(|x| x.first()).and_then(|x| x.as_usize()) { + channel.aid = aid; + } + } + } - let mut chunk = ActionChunk::new(initial_chat, options)?; + let mut continuation = context + .client + .chat_live(GetLiveChatRequest { continuation: &continuation_token }) + .await? + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(ChatError::Receive)?; + let continuation: GetLiveChatResponse<'_> = simd_json::from_slice(&mut continuation)?; + let Some(contents) = continuation.continuation_contents else { + break 'i; + }; - let mut channel = SignalerChannelInner::with_topic(topic, options.tango_api_key.as_ref().unwrap()); - channel.choose_server().await?; - channel.init_session().await?; + for event in contents + .live_chat_continuation + .actions + .unwrap_or_default() + .into_iter() + .filter_map(|act| ChatEvent::from_action(&act.action)) + { + yielder.r#yield(event).await; + } - for action in chunk.iter() { - match action { - Action::AddChatItem { item, .. } => { - if !seen_messages.contains(item.id()) { - yielder.r#yield(action.to_owned()).await; - seen_messages.insert(item.id().to_owned()); - } - } - Action::ReplayChat { actions, .. } => { - for action in actions { - if let Action::AddChatItem { .. } = action.action { - yielder.r#yield(action.action.to_owned()).await; - } - } - } - action => { - yielder.r#yield(action.to_owned()).await; - } - } - } - - 'i: loop { - match chunk.cont().await { - Some(Ok(c)) => chunk = c, - Some(Err(e)) => { - tracing::error!(source = ?e, trigger = "between-sessions", "Failed to fetch continuation"); - } - _ => break 'i - }; - - for action in chunk.iter() { - match action { - Action::AddChatItem { item, .. } => { - if !seen_messages.contains(item.id()) { - yielder.r#yield(action.to_owned()).await; - seen_messages.insert(item.id().to_owned()); - } - } - Action::ReplayChat { actions, .. } => { - for action in actions { - if let Action::AddChatItem { .. } = action.action { - yielder.r#yield(action.action.to_owned()).await; - } - } - } - action => { - yielder.r#yield(action.to_owned()).await; - } - } - } - - let mut req = { - channel.reset(); - channel.choose_server().await?; - channel.init_session().await?; - channel.get_session_stream().await? - }; - loop { - match req.chunk().await { - Ok(Some(s)) => { - let mut ofs_res_line = s.lines().nth(1).unwrap().unwrap(); - if let Ok(s) = unsafe { simd_json::from_str::(ofs_res_line.as_mut()) } { - let a = s.as_array().unwrap(); - { - channel.aid = a[a.len() - 1].as_array().unwrap()[0].as_usize().unwrap(); - } - } + let Some(Continuation::Invalidation { continuation: next_token, .. }) = + contents.live_chat_continuation.continuations.first() + else { + break 'i; + }; - match chunk.cont().await { - Some(Ok(c)) => chunk = c, - Some(Err(e)) => { - tracing::error!(source = ?e, trigger = "signal", "Failed to fetch continuation"); + continuation_token.clear(); + continuation_token.push_str(next_token); } - _ => break 'i - }; - channel.topic = chunk.signaler_topic.clone().unwrap(); - - for action in chunk.iter() { - match action { - Action::AddChatItem { item, .. } => { - if !seen_messages.contains(item.id()) { - yielder.r#yield(action.to_owned()).await; - seen_messages.insert(item.id().to_owned()); - } - } - Action::ReplayChat { actions, .. } => { - for action in actions { - if let Action::AddChatItem { .. } = action.action { - yielder.r#yield(action.action.to_owned()).await; - } - } - } - action => { - yielder.r#yield(action.to_owned()).await; - } + Ok(None) => break, + Err(e) => { + tracing::error!(source = ?e, "signaler stream errored"); + break; } } } - Ok(None) => break, - Err(e) => { - tracing::error!(source = ?e, "Signaler stream errored"); - break; - } } - } - - seen_messages.clear(); - } + Ok(()) + })) + }) } - Continuation::Replay { .. } => { - let mut chunk = ActionChunk::new(initial_chat, options)?; - loop { - for action in chunk.iter() { - match action { - Action::AddChatItem { .. } => { - yielder.r#yield(action.to_owned()).await; - } - Action::ReplayChat { actions, .. } => { - for action in actions { - if let Action::AddChatItem { .. } = action.action { - yielder.r#yield(action.action.to_owned()).await; - } - } - } - action => { - yielder.r#yield(action.to_owned()).await; + Continuation::Replay { continuation, .. } => { + let continuation_token = continuation.to_string(); + let events: Vec = contents + .live_chat_continuation + .actions + .unwrap_or_default() + .into_iter() + .filter_map(|act| ChatEvent::from_action(&act.action)) + .collect(); + + let _ = initial_continuation; + let _ = initial_continuation_bytes; + + Ok(Self { + initial_events: Vec::default(), + stream: Box::pin(try_async_stream(move |yielder| async move { + let mut continuation_token = continuation_token; + let mut events = events; + loop { + for event in events.drain(..) { + yielder.r#yield(event).await; } - } - } - match chunk.cont().await { - Some(Ok(e)) => chunk = e, - _ => break - } - } - } - Continuation::Timed { timeout_ms, .. } => { - let timeout = Duration::from_millis(*timeout_ms as _); - let mut chunk = ActionChunk::new(initial_chat, options)?; - loop { - for action in chunk.iter() { - match action { - Action::AddChatItem { item, .. } => { - if !seen_messages.contains(item.id()) { - yielder.r#yield(action.to_owned()).await; - seen_messages.insert(item.id().to_owned()); + + let mut continuation = context + .client + .chat_replay(GetLiveChatRequest { continuation: &continuation_token }) + .await? + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(ChatError::Receive)?; + let continuation: GetLiveChatResponse<'_> = simd_json::from_slice(&mut continuation)?; + let Some(contents) = continuation.continuation_contents else { + break; + }; + + for action in contents.live_chat_continuation.actions.unwrap_or_default() { + if let Action::ReplayChat { actions, .. } = action.action { + events.extend(actions.into_iter().filter_map(|act| ChatEvent::from_action(&act.action))); } } - Action::ReplayChat { actions, .. } => { - for action in actions { - if let Action::AddChatItem { .. } = action.action { - yielder.r#yield(action.action.to_owned()).await; - } + + let Some(Continuation::Replay { continuation: next_token, .. }) = contents.live_chat_continuation.continuations.first() else { + for event in events.drain(..) { + yielder.r#yield(event).await; } - } - action => { - yielder.r#yield(action.to_owned()).await; - } + break; + }; + + continuation_token.clear(); + continuation_token.push_str(next_token); } - } - sleep(timeout).await; - match chunk.cont().await { - Some(Ok(e)) => chunk = e, - _ => break - } - } + Ok(()) + })) + }) } - Continuation::PlayerSeek { .. } => panic!("player seek should not be first continuation") + Continuation::Timed { .. } => unimplemented!("Continuation::Timed"), + Continuation::PlayerSeek { .. } => unreachable!("PlayerSeek shouldn't be the first continuation") } - Ok(()) - }))) + } + + pub fn initial_events(&mut self) -> impl Iterator + '_ { + self.initial_events.drain(..) + } +} + +impl Stream for Chat { + type Item = Result>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + self.project().stream.poll_next(cx) + } +} + +#[derive(Debug)] +pub enum ChatError { + NoChat, + Deserialize(simd_json::Error), + Client(ClientError), + Receive(::Error), + Signaler(SignalerError), + Innertube(InnertubeError) +} + +impl From for ChatError { + fn from(e: simd_json::Error) -> Self { + Self::Deserialize(e) + } +} +impl From> for ChatError { + fn from(e: ClientError) -> Self { + Self::Client(e) + } +} +impl From for ChatError { + fn from(e: InnertubeError) -> Self { + Self::Innertube(e) + } +} +impl From> for ChatError { + fn from(e: SignalerError) -> Self { + Self::Signaler(e) + } +} + +impl fmt::Display for ChatError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NoChat => f.write_str("stream has no chat"), + Self::Deserialize(e) => f.write_fmt(format_args!("failed to deserialize response: {e}")), + Self::Client(e) => fmt::Display::fmt(e, f), + Self::Receive(e) => f.write_fmt(format_args!("failed to receive response: {e}")), + Self::Signaler(e) => f.write_fmt(format_args!("real-time channel failed: {e}")), + Self::Innertube(e) => fmt::Display::fmt(e, f) + } + } +} + +impl StdError for ChatError +where + E::Response: fmt::Debug +{ + fn cause(&self) -> Option<&dyn StdError> { + match self { + Self::Deserialize(e) => Some(e), + Self::Client(e) => Some(e), + Self::Receive(e) => Some(e), + Self::Signaler(e) => Some(e), + Self::Innertube(e) => Some(e), + _ => None + } + } } diff --git a/src/youtube/signaler.rs b/src/youtube/signaler.rs index bd18d48..ab560e2 100644 --- a/src/youtube/signaler.rs +++ b/src/youtube/signaler.rs @@ -12,33 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, io::BufRead, iter}; +use std::{error::Error as StdError, fmt, io::BufRead, iter}; -use rand::Rng; -use reqwest::{Response, header}; +use bytes::Bytes; +use http::{HeaderName, HeaderValue, Method, Uri, header, uri::PathAndQuery}; use simd_json::{ OwnedValue, base::{ValueAsArray, ValueAsScalar} }; -use url::Url; -use super::{Error, util::SimdJsonResponseBody}; - -const GCM_SIGNALER_SRQE: &str = "https://signaler-pa.youtube.com/punctual/v1/chooseServer"; -const GCM_SIGNALER_PSUB: &str = "https://signaler-pa.youtube.com/punctual/multi-watch/channel"; +use super::client::{Client, ClientError, RequestExecutor, Response}; #[derive(Debug, Default)] -pub struct SignalerChannelInner { +pub struct SignalerChannel { pub(crate) topic: String, tango_key: String, gsessionid: Option, sid: Option, rid: usize, - pub(crate) aid: usize, - session_n: usize + pub(crate) aid: usize } -impl SignalerChannelInner { +impl SignalerChannel { pub fn with_topic(topic: impl ToString, tango_key: impl ToString) -> Self { Self { topic: topic.to_string(), @@ -52,62 +47,74 @@ impl SignalerChannelInner { self.sid = None; self.rid = 0; self.aid = 0; - self.session_n = 0; } fn gen_zx() -> String { const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789"; - let mut rng = rand::rng(); - iter::repeat_with(|| CHARSET[rng.random_range(0..CHARSET.len())] as char) + iter::repeat_with(|| CHARSET[fastrand::usize(0..CHARSET.len())] as char) .take(11) .collect() } - pub async fn choose_server(&mut self) -> Result<(), Error> { - let server_response: OwnedValue = super::get_http_client() - .post(Url::parse_with_params(GCM_SIGNALER_SRQE, [("key", &self.tango_key)])?) - .header(header::CONTENT_TYPE, "application/json+protobuf") - .body(format!(r#"[[null,null,null,[8,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]]]]"#, self.topic)) - .send() - .await? - .simd_json() - .await?; + pub async fn choose_server(&mut self, client: &Client) -> Result<(), SignalerError> { + let request = client + .base_request( + Uri::builder() + .scheme("https") + .authority("signaler-pa.youtube.com") + .path_and_query( + format!("/punctual/v1/chooseServer?key={}", self.tango_key) + .parse::() + .expect("invalid path") + ) + .build() + .expect("invalid URI") + ) + .method(Method::POST) + .header(header::CONTENT_TYPE, HeaderValue::from_static("application/json+protobuf")) + .body(format!(r#"[[null,null,null,[8,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]]]]"#, self.topic).into()) + .expect("invalid request"); + let mut server_response = client.execute(request).await?.recv_all().await.map_err(SignalerError::Receive)?; + let server_response: simd_json::BorrowedValue<'_> = simd_json::from_slice(&mut server_response)?; let gsess = server_response.as_array().unwrap()[0].as_str().unwrap(); self.gsessionid = Some(gsess.to_owned()); Ok(()) } - pub async fn init_session(&mut self) -> Result<(), Error> { - let mut ofs_parameters = HashMap::new(); - ofs_parameters.insert("count", "1".to_string()); - ofs_parameters.insert("ofs", "0".to_string()); - ofs_parameters.insert( - "req0___data__", - format!(r#"[[["1",[null,null,null,[8,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]]"#, self.topic) + pub async fn init_session(&mut self, client: &Client) -> Result<(), SignalerError> { + let ofs_parameters = format!( + // [[["1",[null,null,null,[8,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]] + "count=1&ofs=0&req0___data__=%5B%5B%5B%221%22%2C%5Bnull%2Cnull%2Cnull%2C%5B8%2C5%5D%2Cnull%2C%5B%5B%22youtube_live_chat_web%22%5D%2C%5B1%5D%2C%5B%5B%5B%22{}%22%5D%5D%5D%5D%2Cnull%2Cnull%2C1%5D%2Cnull%2C3%5D%5D%5D", + self.topic ); - self.session_n = 1; - let ofs = super::get_http_client() - .post(Url::parse_with_params( - GCM_SIGNALER_PSUB, - [ - ("VER", "8"), - ("gsessionid", self.gsessionid.as_ref().unwrap()), - ("key", &self.tango_key), - ("RID", &self.rid.to_string()), - ("CVER", "22"), - ("zx", Self::gen_zx().as_ref()), - ("t", "1") - ] - )?) - // yes, this is required. why? who the fuck knows! but if you don't provide this, you get the typical google - // robot error complaining about an invalid request body when you GET GCM_SIGNALER_PSUB. yes, invalid request - // body, in a GET request. where the error actually refers to this POST request. because that makes sense. - .header("X-WebChannel-Content-Type", "application/json+protobuf") - .form(&ofs_parameters) - .send() - .await?; - - let mut ofs_res_line = ofs.bytes().await?.lines().nth(1).unwrap().unwrap(); + + let request = client + .base_request( + Uri::builder() + .scheme("https") + .authority("signaler-pa.youtube.com") + .path_and_query( + format!( + "/punctual/multi-watch/channel?VER=8&gsessionid={gsi}&key={tango_key}&RID={rid}&CVER=22&zx={zx}&t=1", + gsi = self.gsessionid.as_ref().expect("should have chosen server by now"), + tango_key = self.tango_key, + rid = "0", // self.rid + zx = Self::gen_zx() + ) + .parse::() + .expect("invalid path") + ) + .build() + .expect("invalid URI") + ) + .method(Method::POST) + .header(HeaderName::from_static("x-webchannel-content-type"), HeaderValue::from_static("application/json+protobuf")) + .header(header::CONTENT_TYPE, HeaderValue::from_static("application/x-www-form-urlencoded")) + .body(ofs_parameters.into()) + .expect("invalid request"); + let ofs = client.execute(request).await?.recv_all().await.map_err(SignalerError::Receive)?; + + let mut ofs_res_line = ofs.lines().nth(1).unwrap().expect("shouldn't fail"); let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; let value = value.as_array().unwrap()[0].as_array().unwrap(); // first value might be 1 if the request has an error, not entirely sure @@ -117,25 +124,76 @@ impl SignalerChannelInner { Ok(()) } - pub async fn get_session_stream(&self) -> Result { - Ok(super::get_http_client() - .get(Url::parse_with_params( - GCM_SIGNALER_PSUB, - [ - ("VER", "8"), - ("gsessionid", self.gsessionid.as_ref().unwrap()), - ("key", &self.tango_key), - ("RID", "rpc"), - ("SID", self.sid.as_ref().unwrap()), - ("AID", &self.aid.to_string()), - ("CI", "0"), - ("TYPE", "xmlhttp"), - ("zx", &Self::gen_zx()), - ("t", "1") - ] - )?) - .header(header::CONNECTION, "keep-alive") - .send() - .await?) + pub async fn get_session_stream(&self, client: &Client) -> Result> { + let request = client + .base_request( + Uri::builder() + .scheme("https") + .authority("signaler-pa.youtube.com") + .path_and_query( + format!( + "/punctual/multi-watch/channel?VER=8&gsessionid={gsi}&key={tango_key}&RID=rpc&SID={sid}&AID={aid}&CI=0&TYPE=xmlhttp&zx={zx}&t=1", + gsi = self.gsessionid.as_ref().expect("should have chosen server by now"), + tango_key = self.tango_key, + sid = self.sid.as_ref().expect("should have SID by now"), + aid = self.aid, + zx = Self::gen_zx() + ) + .parse::() + .expect("invalid path") + ) + .build() + .expect("invalid URI") + ) + .method(Method::GET) + .header(header::CONNECTION, HeaderValue::from_static("keep-alive")) + .body(Bytes::new()) + .expect("invalid request"); + let res = client.execute(request).await?; + Ok(res) + } +} + +#[derive(Debug)] +pub enum SignalerError { + NoChat, + Deserialize(simd_json::Error), + Client(ClientError), + Receive(::Error) +} + +impl From for SignalerError { + fn from(e: simd_json::Error) -> Self { + Self::Deserialize(e) + } +} +impl From> for SignalerError { + fn from(e: ClientError) -> Self { + Self::Client(e) + } +} + +impl fmt::Display for SignalerError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NoChat => f.write_str("stream has no chat"), + Self::Deserialize(e) => f.write_fmt(format_args!("failed to deserialize response: {e}")), + Self::Client(e) => fmt::Display::fmt(e, f), + Self::Receive(e) => f.write_fmt(format_args!("failed to receive response: {e}")) + } + } +} + +impl StdError for SignalerError +where + E::Response: fmt::Debug +{ + fn cause(&self) -> Option<&dyn StdError> { + match self { + Self::Deserialize(e) => Some(e), + Self::Client(e) => Some(e), + Self::Receive(e) => Some(e), + _ => None + } } } diff --git a/src/youtube/types/browse.rs b/src/youtube/types/browse.rs new file mode 100644 index 0000000..67a2d97 --- /dev/null +++ b/src/youtube/types/browse.rs @@ -0,0 +1,118 @@ +// Copyright 2024 pyke.io +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +use super::{LocalizedText, deserialize_number_from_string}; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BrowseRequest<'s> { + pub browse_id: &'s str, + pub params: Option<&'s str> +} + +#[derive(Debug, Deserialize)] +pub struct BrowseResponse<'s> { + #[serde(borrow)] + pub contents: BrowseResponseContents<'s> +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BrowseResponseContents<'s> { + TwoColumnBrowseResultsRenderer { + #[serde(borrow)] + tabs: Vec> + } +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum TabItemRenderer<'s> { + TabRenderer { + #[serde(default)] + selected: bool, + #[serde(borrow)] + content: Option> + }, + #[serde(untagged)] + #[expect(unused)] + Other(#[serde(borrow)] simd_json::BorrowedValue<'s>) +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum FeedContentsRenderer<'s> { + RichGridRenderer { + #[serde(borrow)] + contents: Vec> + }, + #[serde(untagged)] + #[expect(unused)] + Other(#[serde(borrow)] simd_json::BorrowedValue<'s>) +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RichGridItem<'s> { + #[serde(rename_all = "camelCase")] + RichItemRenderer { + #[serde(borrow)] + content: RichItemContent<'s> + }, + #[serde(untagged)] + #[expect(unused)] + Other(#[serde(borrow)] simd_json::BorrowedValue<'s>) +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RichItemContent<'s> { + #[serde(rename_all = "camelCase")] + VideoRenderer { + #[serde(borrow)] + title: LocalizedText<'s>, + // #[serde(borrow)] + // thumbnail: ImageContainer<'s>, + #[serde(borrow)] + thumbnail_overlays: Vec>, + video_id: &'s str, + upcoming_event_data: Option + } +} + +#[derive(Debug, Deserialize)] +pub struct UpcomingEventData { + #[serde(rename = "startTime")] + #[serde(deserialize_with = "deserialize_number_from_string")] + pub start_time_secs: u64 +} + +#[derive(Debug, Deserialize)] +pub enum ThumbnailOverlay<'s> { + #[serde(rename = "thumbnailOverlayTimeStatusRenderer")] + TimeStatus { style: VideoTimeStatus }, + #[serde(untagged)] + #[expect(unused)] + Other(#[serde(borrow)] simd_json::BorrowedValue<'s>) +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum VideoTimeStatus { + Upcoming, + Live, + Default +} diff --git a/src/youtube/types/get_live_chat.rs b/src/youtube/types/get_live_chat.rs index af0d893..5aad6fa 100644 --- a/src/youtube/types/get_live_chat.rs +++ b/src/youtube/types/get_live_chat.rs @@ -12,145 +12,101 @@ // See the License for the specific language governing permissions and // limitations under the License. -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use serde_aux::prelude::*; -use url::Url; -use super::{Accessibility, CommandMetadata, Icon, ImageContainer, LocalizedText, UnlocalizedText, deserialize_datetime_utc_from_microseconds}; -use crate::youtube::{ - ChatContext, Error, TANGO_LIVE_ENDPOINT, TANGO_REPLAY_ENDPOINT, get_http_client, - util::{SimdJsonRequestBody, SimdJsonResponseBody} -}; +use super::{Accessibility, Icon, ImageContainer, LocalizedText, UnlocalizedText, deserialize_number_from_string}; #[derive(Serialize, Debug)] -pub struct GetLiveChatRequestBody { - context: GetLiveChatRequestBodyContext, - continuation: String -} - -impl GetLiveChatRequestBody { - pub(crate) fn new(continuation: impl Into, client_version: impl Into, client_name: impl Into) -> Self { - Self { - context: GetLiveChatRequestBodyContext { - client: GetLiveChatRequestBodyContextClient { - client_version: client_version.into(), - client_name: client_name.into() - } - }, - continuation: continuation.into() - } - } -} - -#[derive(Serialize, Debug)] -pub struct GetLiveChatRequestBodyContext { - client: GetLiveChatRequestBodyContextClient -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct GetLiveChatRequestBodyContextClient { - client_version: String, - client_name: String +pub struct GetLiveChatRequest<'s> { + pub continuation: &'s str } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct GetLiveChatResponse { - pub response_context: Option, - pub continuation_contents: Option -} - -impl GetLiveChatResponse { - pub async fn fetch(options: &ChatContext, continuation: impl AsRef) -> Result { - let body = GetLiveChatRequestBody::new(continuation.as_ref(), &options.client_version, "WEB"); - Ok(get_http_client() - .post(Url::parse_with_params( - if options.live_status.updates_live() { TANGO_LIVE_ENDPOINT } else { TANGO_REPLAY_ENDPOINT }, - [("prettyPrint", "false")] - )?) - .simd_json(&body)? - .send() - .await? - .simd_json() - .await?) - } +pub struct GetLiveChatResponse<'s> { + #[serde(borrow)] + pub continuation_contents: Option> } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct GetLiveChatResponseContinuationContents { - pub live_chat_continuation: LiveChatContinuation +pub struct GetLiveChatResponseContinuationContents<'s> { + #[serde(borrow)] + pub live_chat_continuation: LiveChatContinuation<'s> } #[derive(Deserialize, Debug)] -pub struct LiveChatContinuation { - pub continuations: Vec, - pub actions: Option> +pub struct LiveChatContinuation<'s> { + #[serde(borrow)] + pub continuations: Vec>, + #[serde(borrow)] + pub actions: Option>> } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct ActionContainer { - #[serde(flatten)] - pub action: Action, - pub click_tracking_params: Option +pub struct ActionContainer<'s> { + #[serde(borrow, flatten)] + pub action: Action<'s> } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub enum Continuation { +pub enum Continuation<'s> { #[serde(rename = "invalidationContinuationData")] #[serde(rename_all = "camelCase")] Invalidation { - invalidation_id: InvalidationId, - timeout_ms: usize, - continuation: String + #[serde(borrow)] + invalidation_id: InvalidationId<'s>, + // timeout_ms: usize, + continuation: &'s str }, #[serde(rename = "timedContinuationData")] #[serde(rename_all = "camelCase")] - Timed { timeout_ms: usize, continuation: String }, + Timed { timeout_ms: usize, continuation: &'s str }, #[serde(rename = "liveChatReplayContinuationData")] #[serde(rename_all = "camelCase")] - Replay { time_until_last_message_msec: usize, continuation: String }, + Replay { continuation: &'s str }, #[serde(rename = "playerSeekContinuationData")] #[serde(rename_all = "camelCase")] - PlayerSeek { continuation: String } + #[allow(unused)] + PlayerSeek { continuation: &'s str } } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct InvalidationId { - pub object_source: usize, - pub object_id: String, - pub topic: String, - pub subscribe_to_gcm_topics: bool, - pub proto_creation_timestamp_ms: String +pub struct InvalidationId<'s> { + pub topic: &'s str } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub enum Action { +pub enum Action<'s> { #[serde(rename = "addChatItemAction")] #[serde(rename_all = "camelCase")] - AddChatItem { item: ChatItem, client_id: Option }, + AddChatItem { + #[serde(borrow)] + item: ChatItem<'s>, + client_id: Option<&'s str> + }, #[serde(rename = "replaceChatItemAction")] #[serde(rename_all = "camelCase")] - ReplaceChatItem { target_item_id: String, replacement_item: ChatItem }, + ReplaceChatItem { + target_item_id: &'s str, + #[serde(borrow)] + replacement_item: ChatItem<'s> + }, #[serde(rename = "removeChatItemAction")] #[serde(rename_all = "camelCase")] - RemoveChatItem { target_item_id: String }, + RemoveChatItem { target_item_id: &'s str }, #[serde(rename = "removeChatItemByAuthorAction")] #[serde(rename_all = "camelCase")] - RemoveChatItemByAuthor { external_channel_id: String }, - #[serde(rename = "addLiveChatTickerItemAction")] - #[serde(rename_all = "camelCase")] - AddLiveChatTicker { item: simd_json::OwnedValue }, + RemoveChatItemByAuthor { external_channel_id: &'s str }, #[serde(rename = "replayChatItemAction")] #[serde(rename_all = "camelCase")] ReplayChat { - actions: Vec, + #[serde(borrow)] + actions: Vec>, #[serde(deserialize_with = "deserialize_number_from_string")] video_offset_time_msec: i64 }, @@ -166,62 +122,58 @@ pub enum Action { #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct AuthorBadge { - pub live_chat_author_badge_renderer: LiveChatAuthorBadgeRenderer +pub struct AuthorBadge<'s> { + #[serde(borrow)] + pub live_chat_author_badge_renderer: LiveChatAuthorBadgeRenderer<'s> } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct LiveChatAuthorBadgeRenderer { - pub custom_thumbnail: Option, - pub icon: Option, - pub tooltip: String, - pub accessibility: Accessibility +pub struct LiveChatAuthorBadgeRenderer<'s> { + #[serde(borrow)] + pub custom_thumbnail: Option>, + #[serde(borrow)] + pub icon: Option>, + pub tooltip: &'s str, + #[serde(borrow)] + pub accessibility: Accessibility<'s> } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct MessageRendererBase { - pub author_name: Option, - pub author_photo: ImageContainer, - pub author_badges: Option>, - pub context_menu_endpoint: ContextMenuEndpoint, - pub id: String, - #[serde(deserialize_with = "deserialize_datetime_utc_from_microseconds")] - pub timestamp_usec: DateTime, - pub author_external_channel_id: String, - pub context_menu_accessibility: Accessibility -} - -#[derive(Deserialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ContextMenuEndpoint { - pub command_metadata: CommandMetadata, - pub live_chat_item_context_menu_endpoint: LiveChatItemContextMenuEndpoint -} - -#[derive(Deserialize, Debug, Clone)] -pub struct LiveChatItemContextMenuEndpoint { - pub params: String +pub struct MessageRendererBase<'s> { + pub id: &'s str, + #[serde(borrow)] + pub author_name: Option>, + #[serde(borrow)] + pub author_photo: ImageContainer<'s>, + #[serde(borrow)] + pub author_badges: Option>>, + #[serde(deserialize_with = "deserialize_number_from_string")] + pub timestamp_usec: i64, + pub author_external_channel_id: &'s str } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub enum ChatItem { +pub enum ChatItem<'s> { #[serde(rename = "liveChatTextMessageRenderer")] #[serde(rename_all = "camelCase")] TextMessage { - #[serde(flatten)] - message_renderer_base: MessageRendererBase, - message: Option + #[serde(borrow, flatten)] + message_renderer_base: MessageRendererBase<'s>, + #[serde(borrow)] + message: Option> }, #[serde(rename = "liveChatPaidMessageRenderer")] #[serde(rename_all = "camelCase")] Superchat { - #[serde(flatten)] - message_renderer_base: MessageRendererBase, - message: Option, - purchase_amount_text: UnlocalizedText, + #[serde(borrow, flatten)] + message_renderer_base: MessageRendererBase<'s>, + #[serde(borrow)] + message: Option>, + #[serde(borrow)] + purchase_amount_text: UnlocalizedText<'s>, header_background_color: isize, header_text_color: isize, body_background_color: isize, @@ -231,18 +183,22 @@ pub enum ChatItem { #[serde(rename = "liveChatMembershipItemRenderer")] #[serde(rename_all = "camelCase")] MembershipItem { - #[serde(flatten)] - message_renderer_base: MessageRendererBase, - header_sub_text: Option, - author_badges: Option> + #[serde(borrow, flatten)] + message_renderer_base: MessageRendererBase<'s>, + #[serde(borrow)] + header_sub_text: Option>, + #[serde(borrow)] + author_badges: Option>> }, #[serde(rename = "liveChatPaidStickerRenderer")] #[serde(rename_all = "camelCase")] PaidSticker { - #[serde(flatten)] - message_renderer_base: MessageRendererBase, - purchase_amount_text: UnlocalizedText, - sticker: ImageContainer, + #[serde(borrow, flatten)] + message_renderer_base: MessageRendererBase<'s>, + #[serde(borrow)] + purchase_amount_text: UnlocalizedText<'s>, + #[serde(borrow)] + sticker: ImageContainer<'s>, money_chip_background_color: isize, money_chip_text_color: isize, sticker_display_width: isize, @@ -268,8 +224,8 @@ pub enum ChatItem { #[serde(rename_all = "camelCase")] Placeholder { id: String, - #[serde(deserialize_with = "deserialize_datetime_utc_from_microseconds")] - timestamp_usec: DateTime + #[serde(deserialize_with = "deserialize_number_from_string")] + timestamp_usec: i64 }, #[serde(rename = "liveChatViewerEngagementMessageRenderer")] ViewerEngagement { id: String }, @@ -277,13 +233,13 @@ pub enum ChatItem { Unknown(simd_json::OwnedValue) } -impl ChatItem { +impl ChatItem<'_> { pub fn id(&self) -> &str { match self { - ChatItem::MembershipItem { message_renderer_base, .. } => &message_renderer_base.id, - ChatItem::PaidSticker { message_renderer_base, .. } => &message_renderer_base.id, - ChatItem::Superchat { message_renderer_base, .. } => &message_renderer_base.id, - ChatItem::TextMessage { message_renderer_base, .. } => &message_renderer_base.id, + ChatItem::MembershipItem { message_renderer_base, .. } => message_renderer_base.id, + ChatItem::PaidSticker { message_renderer_base, .. } => message_renderer_base.id, + ChatItem::Superchat { message_renderer_base, .. } => message_renderer_base.id, + ChatItem::TextMessage { message_renderer_base, .. } => message_renderer_base.id, ChatItem::MembershipGift { id, .. } => id, ChatItem::MembershipGiftRedemption { id, .. } => id, ChatItem::Placeholder { id, .. } => id, diff --git a/src/youtube/types/mod.rs b/src/youtube/types/mod.rs index e07cdcf..dfd26af 100644 --- a/src/youtube/types/mod.rs +++ b/src/youtube/types/mod.rs @@ -12,48 +12,110 @@ // See the License for the specific language governing permissions and // limitations under the License. -use serde::{Deserialize, Deserializer, de::Error}; -use serde_aux::field_attributes::deserialize_number_from_string; -use simd_json::OwnedValue; +use std::{fmt, str::FromStr}; +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::youtube::client::{Client, RequestExecutor}; + +pub mod browse; pub mod get_live_chat; -pub mod streams_page; +pub mod video; -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Debug)] +pub struct InnertubeRequest<'s, T: Serialize + 's> { + context: InnertubeRequestContext<'s>, + #[serde(flatten)] + body: T +} + +impl<'s, T: Serialize + 's> InnertubeRequest<'s, T> { + pub(crate) fn new<'r: 's, E: RequestExecutor>(client: &'r Client, body: T) -> Self { + Self { + context: client.request_context(), + body + } + } +} + +#[derive(Serialize, Debug)] +pub struct InnertubeRequestContext<'s> { + pub client: InnertubeRequestContextClient<'s> +} + +#[derive(Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct CommandMetadata { - pub web_command_metadata: OwnedValue +pub struct InnertubeRequestContextClient<'c> { + pub client_version: &'c str, + pub client_name: &'c str +} + +#[derive(Deserialize, Debug)] +#[serde(untagged, rename_all = "camelCase")] +pub enum InnertubeResponse<'s, T> { + Error { + #[serde(borrow)] + error: InnertubeError<'s> + }, + Success { + #[serde(flatten)] + data: T + } +} + +#[derive(Deserialize, Debug)] +pub struct InnertubeError<'s> { + pub code: u16, + pub message: &'s str, + pub errors: Vec>, + pub status: &'s str +} + +#[derive(Deserialize, Debug)] +pub struct InnertubeErrorDetail<'s> { + pub message: &'s str, + pub domain: &'s str, + pub reason: &'s str } #[derive(Deserialize, Default, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct UnlocalizedText { - pub simple_text: String, - pub accessibility: Option +pub struct UnlocalizedText<'s> { + pub simple_text: &'s str, + #[serde(borrow)] + pub accessibility: Option> } #[derive(Deserialize, Debug, Clone)] #[serde(untagged)] -pub enum LocalizedRun { +pub enum LocalizedRun<'s> { Text { - text: String + text: &'s str }, #[serde(rename_all = "camelCase")] Emoji { - emoji: Emoji, - variant_ids: Option> + emoji: Emoji<'s>, + #[serde(borrow)] + variant_ids: Option> } } -impl LocalizedRun { - pub fn to_chat_string(&self) -> String { +impl fmt::Display for LocalizedRun<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Text { text } => text.to_owned(), + Self::Text { text } => f.write_str(text), Self::Emoji { emoji, .. } => { + let label = emoji + .image + .accessibility + .as_ref() + .expect("emojis should always have accessibility data") + .accessibility_data + .label; if let Some(true) = emoji.is_custom_emoji { - format!(":{}:", emoji.image.accessibility.as_ref().unwrap().accessibility_data.label) + f.write_fmt(format_args!(":{label}:")) } else { - emoji.image.accessibility.as_ref().unwrap().accessibility_data.label.to_owned() + f.write_str(label) } } } @@ -61,61 +123,63 @@ impl LocalizedRun { } #[derive(Deserialize, Debug, Clone)] -pub struct LocalizedText { - pub runs: Vec +pub struct LocalizedText<'s> { + #[serde(borrow)] + pub runs: Vec> } #[derive(Deserialize, Debug, Clone)] -pub struct ImageContainer { - pub thumbnails: Vec, - pub accessibility: Option +pub struct ImageContainer<'s> { + #[serde(borrow)] + pub thumbnails: Vec>, + #[serde(borrow)] + pub accessibility: Option> } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct Accessibility { - pub accessibility_data: AccessibilityData +pub struct Accessibility<'s> { + #[serde(borrow)] + pub accessibility_data: AccessibilityData<'s> } #[derive(Deserialize, Debug, Clone)] -pub struct AccessibilityData { - pub label: String +pub struct AccessibilityData<'s> { + pub label: &'s str } #[derive(Deserialize, Debug, Clone)] -pub struct Thumbnail { - pub url: String, +pub struct Thumbnail<'s> { + pub url: &'s str, pub width: Option, pub height: Option } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct Emoji { - pub emoji_id: String, - pub shortcuts: Option>, - pub search_terms: Option>, +pub struct Emoji<'s> { + pub emoji_id: &'s str, + #[serde(borrow)] + pub shortcuts: Option>, + #[serde(borrow)] + pub search_terms: Option>, pub supports_skin_tone: Option, - pub image: ImageContainer, + pub image: ImageContainer<'s>, pub is_custom_emoji: Option } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct Icon { - pub icon_type: String +pub struct Icon<'s> { + pub icon_type: &'s str } -pub fn deserialize_datetime_utc_from_microseconds<'de, D>(deserializer: D) -> Result, D::Error> +pub fn deserialize_number_from_string<'de, T, D>(deserializer: D) -> Result where - D: Deserializer<'de> + D: Deserializer<'de>, + T: FromStr, + ::Err: std::error::Error { - use chrono::prelude::*; - - let number = deserialize_number_from_string::(deserializer)?; - let seconds = number / 1_000_000; - let micros = (number % 1_000_000) as u32; - let nanos = micros * 1_000; - - DateTime::from_timestamp(seconds, nanos).ok_or_else(|| D::Error::custom("Couldn't parse the timestamp")) + let t: &str = Deserialize::deserialize(deserializer)?; + t.parse().map_err(serde::de::Error::custom) } diff --git a/src/youtube/types/streams_page.rs b/src/youtube/types/streams_page.rs deleted file mode 100644 index 527f9a1..0000000 --- a/src/youtube/types/streams_page.rs +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2024 pyke.io -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use serde::Deserialize; - -use super::{Accessibility, CommandMetadata, ImageContainer, LocalizedText}; - -#[derive(Debug, Deserialize)] -pub struct YouTubeInitialData { - pub contents: PageContentsRenderer -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum PageContentsRenderer { - TwoColumnBrowseResultsRenderer { tabs: Vec } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum TabItemRenderer { - TabRenderer { - title: String, - #[serde(default)] - selected: bool, - content: Option - }, - ExpandableTabRenderer {} -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum FeedContentsRenderer { - RichGridRenderer { - contents: Vec - }, - #[serde(other)] - Other -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum RichGridItem { - #[serde(rename_all = "camelCase")] - RichItemRenderer { content: RichItemContent }, - #[serde(rename_all = "camelCase")] - ContinuationItemRenderer { trigger: ContinuationItemTrigger } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum RichItemContent { - #[serde(rename_all = "camelCase")] - VideoRenderer { - thumbnail: ImageContainer, - thumbnail_overlays: Vec, - video_id: String - } -} - -#[derive(Debug, Deserialize)] -pub enum ThumbnailOverlay { - #[serde(rename = "thumbnailOverlayTimeStatusRenderer")] - TimeStatus { - style: VideoTimeStatus // text: UnlocalizedText - }, - #[serde(rename = "thumbnailOverlayToggleButtonRenderer")] - #[serde(rename_all = "camelCase")] - ToggleButton { - is_toggled: Option, - toggled_accessibility: Accessibility, - toggled_tooltip: String, - untoggled_accessibility: Accessibility, - untoggled_tooltip: String - }, - #[serde(rename = "thumbnailOverlayNowPlayingRenderer")] - NowPlaying { text: LocalizedText } -} - -#[derive(Debug, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum VideoTimeStatus { - Upcoming, - Live, - Default -} - -#[derive(Debug, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum ContinuationItemTrigger { - ContinuationTriggerOnItemShown -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum FeedHeaderRenderer { - #[serde(rename_all = "camelCase")] - FeedFilterChipBarRenderer { contents: Vec, style_type: String } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum FeedFilterChip { - #[serde(rename_all = "camelCase")] - ChipCloudChipRenderer { is_selected: bool } -} diff --git a/src/youtube/types/video.rs b/src/youtube/types/video.rs new file mode 100644 index 0000000..42fd93b --- /dev/null +++ b/src/youtube/types/video.rs @@ -0,0 +1,46 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct VideoRequest<'s> { + pub video_id: &'s str +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct VideoResponse<'s> { + #[serde(borrow)] + pub contents: VideoResponseContents<'s> +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub enum VideoResponseContents<'s> { + #[serde(rename_all = "camelCase")] + TwoColumnWatchNextResults { + #[serde(borrow)] + conversation_bar: Option> + } +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub enum ConversationBar<'s> { + #[serde(rename_all = "camelCase")] + LiveChatRenderer { + #[serde(borrow)] + continuations: Vec>, + #[serde(default)] + is_replay: bool + }, + #[serde(untagged)] + #[allow(unused)] + Other(#[serde(borrow)] simd_json::BorrowedValue<'s>) +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub enum ContinuationData<'s> { + #[serde(rename_all = "camelCase")] + ReloadContinuationData { continuation: &'s str } +} diff --git a/src/youtube/util.rs b/src/youtube/util.rs index dc566d8..e85e871 100644 --- a/src/youtube/util.rs +++ b/src/youtube/util.rs @@ -12,32 +12,199 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; +use std::{ + error::Error as StdError, + fmt::{self, Write} +}; -use reqwest::{RequestBuilder, Response}; -use serde::{Serialize, de::DeserializeOwned}; +use super::client::{Client, ClientError, InnertubeError, RequestExecutor, Response, ResponseExt}; +use crate::youtube::{ + LocalizedRun, + types::browse::{ + BrowseRequest, BrowseResponse, BrowseResponseContents, FeedContentsRenderer, RichGridItem, RichItemContent, TabItemRenderer, ThumbnailOverlay, + VideoTimeStatus + } +}; + +pub(crate) const TANGO_API_KEY: &str = "AIzaSyDZNkyC-AtROwMBpLfevIvqYk-Gfi8ZOeo"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StreamStatus { + Live, + Upcoming { scheduled_secs: u64 } +} + +#[derive(Debug)] +pub struct ChannelStream { + video_id: String, + title: String, + status: StreamStatus, + thumbnail_url: Option +} + +impl ChannelStream { + #[inline(always)] + pub fn id(&self) -> &str { + &self.video_id + } + + #[inline(always)] + pub fn title(&self) -> &str { + &self.title + } + + #[inline(always)] + pub fn status(&self) -> StreamStatus { + self.status + } + + #[inline(always)] + pub fn thumbnail_url(&self) -> Option<&str> { + self.thumbnail_url.as_deref() + } +} + +pub(crate) fn stringify_runs(runs: &[LocalizedRun<'_>]) -> String { + let mut s = String::new(); + for run in runs { + write!(&mut s, "{run}").expect("infallible"); + } + s +} + +pub async fn query_channel(channel_id: &str, client: &Client) -> Result, QueryChannelError> { + if !channel_id.starts_with("UC") || channel_id.len() != 24 { + return Err(QueryChannelError::InvalidChannelID); + } + + let mut browse_results = client + .browse(BrowseRequest { + browse_id: channel_id, + // streams tab + params: Some("EgdzdHJlYW1z8gYECgJ6AA%3D%3D") + }) + .await? + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(QueryChannelError::Receive)?; + let browse_results: BrowseResponse<'_> = simd_json::from_slice(&mut browse_results)?; + + let BrowseResponseContents::TwoColumnBrowseResultsRenderer { tabs } = browse_results.contents; + let Some(TabItemRenderer::TabRenderer { content: stream_tab_renderer, .. }) = tabs.iter().find(|c| match c { + TabItemRenderer::TabRenderer { selected, content, .. } => content.is_some() && *selected, + _ => false + }) else { + tracing::warn!("Failed to find stream tab renderer"); + return Ok(Vec::new()); + }; + + let Some(FeedContentsRenderer::RichGridRenderer { contents: stream_items }) = stream_tab_renderer else { + tracing::warn!("Stream tab wasn't a `richGridRenderer`"); + return Ok(Vec::new()); + }; + + Ok(stream_items + .iter() + .filter_map(|c| match c { + RichGridItem::RichItemRenderer { content } => match content { + RichItemContent::VideoRenderer { + thumbnail_overlays, + video_id, + title, + upcoming_event_data, + .. + } => { + let time_status = thumbnail_overlays.iter().find_map(|c| match c { + ThumbnailOverlay::TimeStatus { style, .. } => Some(style), + _ => None + })?; -use super::Error; + if matches!(time_status, VideoTimeStatus::Default) { + return None; + } -pub trait SimdJsonResponseBody { - fn simd_json(self) -> impl Future>; + let video_id = video_id.to_string(); + let title = stringify_runs(&title.runs); + // let thumbnail = thumbnail.thumbnails.last().map(|c| c.url); + let thumbnail = format!("https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg"); // 1280x720, innertube only gives us 336x118 at most + + match time_status { + VideoTimeStatus::Live => Some(ChannelStream { + video_id, + title, + thumbnail_url: Some(thumbnail), + status: StreamStatus::Live + }), + VideoTimeStatus::Upcoming => Some(ChannelStream { + video_id, + title, + thumbnail_url: Some(thumbnail), + status: StreamStatus::Upcoming { + scheduled_secs: upcoming_event_data + .as_ref() + .expect("`upcomingEventData` should be present for UPCOMING streams") + .start_time_secs + } + }), + _ => unreachable!() + } + } + }, + _ => None + }) + .collect()) +} + +#[derive(Debug)] +pub enum QueryChannelError { + InvalidChannelID, + Deserialize(simd_json::Error), + Client(ClientError), + Receive(::Error), + Innertube(InnertubeError) } -impl SimdJsonResponseBody for Response { - async fn simd_json(self) -> Result { - let mut full = self.bytes().await?.to_vec(); - Ok(simd_json::from_slice(&mut full)?) +impl From for QueryChannelError { + fn from(e: simd_json::Error) -> Self { + Self::Deserialize(e) + } +} +impl From> for QueryChannelError { + fn from(e: ClientError) -> Self { + Self::Client(e) + } +} +impl From for QueryChannelError { + fn from(e: InnertubeError) -> Self { + Self::Innertube(e) } } -pub trait SimdJsonRequestBody { - fn simd_json(self, json: &T) -> Result - where - Self: Sized; +impl fmt::Display for QueryChannelError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidChannelID => f.write_str("invalid channel ID"), + Self::Deserialize(e) => f.write_fmt(format_args!("failed to deserialize response: {e}")), + Self::Client(e) => fmt::Display::fmt(e, f), + Self::Receive(e) => f.write_fmt(format_args!("failed to receive response: {e}")), + Self::Innertube(e) => fmt::Display::fmt(e, f) + } + } } -impl SimdJsonRequestBody for RequestBuilder { - fn simd_json(self, json: &T) -> Result { - Ok(self.body(simd_json::to_vec(json)?)) +impl StdError for QueryChannelError +where + E::Response: fmt::Debug +{ + fn cause(&self) -> Option<&dyn StdError> { + match self { + Self::Deserialize(e) => Some(e), + Self::Client(e) => Some(e), + Self::Receive(e) => Some(e), + Self::Innertube(e) => Some(e), + _ => None + } } } From f76b9e90096ed465c74fa203984d493653ff8e4d Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Thu, 4 Dec 2025 15:55:27 -0600 Subject: [PATCH 2/8] refactor signaler --- src/youtube/client.rs | 2 +- src/youtube/mod.rs | 98 +++++++++++++----------------------- src/youtube/signaler.rs | 107 ++++++++++++++++++++++++++++++++++------ 3 files changed, 128 insertions(+), 79 deletions(-) diff --git a/src/youtube/client.rs b/src/youtube/client.rs index 4f1d84f..51e046e 100644 --- a/src/youtube/client.rs +++ b/src/youtube/client.rs @@ -86,7 +86,7 @@ impl fmt::Display for InnertubeError { impl StdError for InnertubeError {} pub trait RequestExecutor: Send + Sync + 'static { - type Response: Response; + type Response: Response + 'static; type Error: StdError + Send; fn make_request(&self, req: http::Request) -> impl Future> + Send + Sync + '_; diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index 77e9325..b2fae8a 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{error::Error as StdError, fmt, io::BufRead, pin::Pin, task::Poll}; +use std::{error::Error as StdError, fmt, pin::Pin, task::Poll}; use async_stream_lite::try_async_stream; -use futures_util::{Stream, stream::BoxStream}; +use futures_util::{Stream, StreamExt, pin_mut, stream::BoxStream}; use pin_project_lite::pin_project; -use simd_json::base::{ValueAsArray, ValueAsScalar}; mod client; mod context; @@ -152,67 +151,40 @@ impl Chat { let _ = continuation; let _ = continuation_bytes; - let mut req = { - channel.reset(); - channel.choose_server(&context.client).await?; - channel.init_session(&context.client).await?; - channel.get_session_stream(&context.client).await? - }; - loop { - match req.recv_chunk().await { - Ok(Some(s)) => { - let Some(mut ofs_res_line) = s.lines().nth(1).transpose().expect("infallible") else { - break; - }; - - if let Ok(s) = unsafe { simd_json::from_str::(ofs_res_line.as_mut()) } { - if let Some(a) = s.as_array() { - // channel.aid = a[a.len() - 1].as_array().unwrap()[0].as_usize().unwrap(); - if let Some(aid) = a.last().and_then(|x| x.as_array()).and_then(|x| x.first()).and_then(|x| x.as_usize()) { - channel.aid = aid; - } - } - } - - let mut continuation = context - .client - .chat_live(GetLiveChatRequest { continuation: &continuation_token }) - .await? - .with_innertube_error() - .await? - .recv_all() - .await - .map_err(ChatError::Receive)?; - let continuation: GetLiveChatResponse<'_> = simd_json::from_slice(&mut continuation)?; - let Some(contents) = continuation.continuation_contents else { - break 'i; - }; - - for event in contents - .live_chat_continuation - .actions - .unwrap_or_default() - .into_iter() - .filter_map(|act| ChatEvent::from_action(&act.action)) - { - yielder.r#yield(event).await; - } - - let Some(Continuation::Invalidation { continuation: next_token, .. }) = - contents.live_chat_continuation.continuations.first() - else { - break 'i; - }; - - continuation_token.clear(); - continuation_token.push_str(next_token); - } - Ok(None) => break, - Err(e) => { - tracing::error!(source = ?e, "signaler stream errored"); - break; - } + let signaler_stream = channel.stream(&context.client).await?; + pin_mut!(signaler_stream); + while let Some(()) = signaler_stream.next().await.transpose()? { + let mut continuation = context + .client + .chat_live(GetLiveChatRequest { continuation: &continuation_token }) + .await? + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(ChatError::Receive)?; + let continuation: GetLiveChatResponse<'_> = simd_json::from_slice(&mut continuation)?; + let Some(contents) = continuation.continuation_contents else { + break 'i; + }; + + for event in contents + .live_chat_continuation + .actions + .unwrap_or_default() + .into_iter() + .filter_map(|act| ChatEvent::from_action(&act.action)) + { + yielder.r#yield(event).await; } + + let Some(Continuation::Invalidation { continuation: next_token, .. }) = contents.live_chat_continuation.continuations.first() + else { + break 'i; + }; + + continuation_token.clear(); + continuation_token.push_str(next_token); } } Ok(()) diff --git a/src/youtube/signaler.rs b/src/youtube/signaler.rs index ab560e2..6259396 100644 --- a/src/youtube/signaler.rs +++ b/src/youtube/signaler.rs @@ -14,7 +14,9 @@ use std::{error::Error as StdError, fmt, io::BufRead, iter}; +use async_stream_lite::try_async_stream; use bytes::Bytes; +use futures_util::Stream; use http::{HeaderName, HeaderValue, Method, Uri, header, uri::PathAndQuery}; use simd_json::{ OwnedValue, @@ -25,12 +27,12 @@ use super::client::{Client, ClientError, RequestExecutor, Response}; #[derive(Debug, Default)] pub struct SignalerChannel { - pub(crate) topic: String, + topic: String, tango_key: String, gsessionid: Option, sid: Option, rid: usize, - pub(crate) aid: usize + aid: usize } impl SignalerChannel { @@ -42,7 +44,7 @@ impl SignalerChannel { } } - pub fn reset(&mut self) { + fn reset(&mut self) { self.gsessionid = None; self.sid = None; self.rid = 0; @@ -56,7 +58,7 @@ impl SignalerChannel { .collect() } - pub async fn choose_server(&mut self, client: &Client) -> Result<(), SignalerError> { + async fn choose_server(&mut self, client: &Client) -> Result<(), SignalerError> { let request = client .base_request( Uri::builder() @@ -76,12 +78,21 @@ impl SignalerChannel { .expect("invalid request"); let mut server_response = client.execute(request).await?.recv_all().await.map_err(SignalerError::Receive)?; let server_response: simd_json::BorrowedValue<'_> = simd_json::from_slice(&mut server_response)?; - let gsess = server_response.as_array().unwrap()[0].as_str().unwrap(); - self.gsessionid = Some(gsess.to_owned()); + + if let Some(res) = server_response.as_array() + && let Some(gsess) = res.first().and_then(|x| x.as_str()) + { + self.gsessionid = Some(gsess.to_owned()); + } else { + return Err(SignalerError::Parse { + source: SignalerParseSource::ChooseServer + }); + } + Ok(()) } - pub async fn init_session(&mut self, client: &Client) -> Result<(), SignalerError> { + async fn init_session(&mut self, client: &Client) -> Result<(), SignalerError> { let ofs_parameters = format!( // [[["1",[null,null,null,[8,5],null,[["youtube_live_chat_web"],[1],[[["{}"]]]],null,null,1],null,3]]] "count=1&ofs=0&req0___data__=%5B%5B%5B%221%22%2C%5Bnull%2Cnull%2Cnull%2C%5B8%2C5%5D%2Cnull%2C%5B%5B%22youtube_live_chat_web%22%5D%2C%5B1%5D%2C%5B%5B%5B%22{}%22%5D%5D%5D%5D%2Cnull%2Cnull%2C1%5D%2Cnull%2C3%5D%5D%5D", @@ -114,17 +125,39 @@ impl SignalerChannel { .expect("invalid request"); let ofs = client.execute(request).await?.recv_all().await.map_err(SignalerError::Receive)?; - let mut ofs_res_line = ofs.lines().nth(1).unwrap().expect("shouldn't fail"); - let value: OwnedValue = unsafe { simd_json::from_str(&mut ofs_res_line) }?; - let value = value.as_array().unwrap()[0].as_array().unwrap(); + let parse_err = Err(SignalerError::Parse { + source: SignalerParseSource::SessionInit + }); + let Some(Ok(mut res_line)) = ofs.lines().nth(1) else { + return Err(SignalerError::Parse { + source: SignalerParseSource::SessionInit + }); + }; + let value: OwnedValue = unsafe { simd_json::from_str(&mut res_line) }?; + + let Some(data) = value.as_array().and_then(|x| x.first().and_then(|x| x.as_array())) else { + return parse_err; + }; + // first value might be 1 if the request has an error, not entirely sure - assert_eq!(value[0].as_usize().unwrap(), 0); - let sid = value[1].as_array().unwrap()[1].as_str().unwrap(); + if data.first().and_then(|x| x.as_usize()) != Some(0) { + return parse_err; + } + + let Some(sid) = data.get(1).and_then(|x| x.as_array()).and_then(|x| x.get(1).and_then(|x| x.as_str())) else { + return parse_err; + }; self.sid = Some(sid.to_owned()); + Ok(()) } - pub async fn get_session_stream(&self, client: &Client) -> Result> { + pub async fn stream(&mut self, client: &Client) -> Result>> + '_, SignalerError> { + // TODO: see if we can not need to reset state every time + self.reset(); + self.choose_server(client).await?; + self.init_session(client).await?; + let request = client .base_request( Uri::builder() @@ -149,14 +182,57 @@ impl SignalerChannel { .header(header::CONNECTION, HeaderValue::from_static("keep-alive")) .body(Bytes::new()) .expect("invalid request"); - let res = client.execute(request).await?; - Ok(res) + let mut res = client.execute(request).await?; + Ok(try_async_stream(|yielder| async move { + loop { + match res.recv_chunk().await { + Ok(Some(chunk)) => { + let mut lines = chunk.lines(); + let Some(Ok(event_id)) = lines.next() else { + break; + }; + + if event_id != "252" && event_id != "253" { + // 50, 51, and 53 are probably some internal stuff we don't care about. 252/253 seem to be correlated with new chat + // messages (though sometimes there aren't new chat messages at all and I'm not sure why). + // The channel starts off sending 252 but after a few seconds sends 253 instead. Not sure the difference between the + // two events, but they're both structured & function the same. + continue; + } + + let Some(Ok(mut ofs_res_line)) = lines.next() else { + break; + }; + + if let Ok(s) = unsafe { simd_json::from_str::(ofs_res_line.as_mut()) } + && let Some(a) = s.as_array() + && let Some(aid) = a.last().and_then(|x| x.as_array()).and_then(|x| x.first()).and_then(|x| x.as_usize()) + { + self.aid = aid; + } + + yielder.r#yield(()).await; + } + Ok(None) => break, + Err(e) => return Err(SignalerError::Receive(e)) + } + } + Ok(()) + })) } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SignalerParseSource { + ChooseServer, + SessionInit, + SessionStream +} + #[derive(Debug)] pub enum SignalerError { NoChat, + Parse { source: SignalerParseSource }, Deserialize(simd_json::Error), Client(ClientError), Receive(::Error) @@ -178,6 +254,7 @@ impl fmt::Display for SignalerError { match self { Self::NoChat => f.write_str("stream has no chat"), Self::Deserialize(e) => f.write_fmt(format_args!("failed to deserialize response: {e}")), + Self::Parse { source } => f.write_fmt(format_args!("couldn't parse response from {source:?}")), Self::Client(e) => fmt::Display::fmt(e, f), Self::Receive(e) => f.write_fmt(format_args!("failed to receive response: {e}")) } From b597e84cbc01d2d52c996db8e9539858866920af Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Thu, 4 Dec 2025 18:35:32 -0600 Subject: [PATCH 3/8] implement `Continuation::Timed` --- src/youtube/mod.rs | 65 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index b2fae8a..232490c 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{error::Error as StdError, fmt, pin::Pin, task::Poll}; +use std::{error::Error as StdError, fmt, pin::Pin, task::Poll, time::Duration}; use async_stream_lite::try_async_stream; use futures_util::{Stream, StreamExt, pin_mut, stream::BoxStream}; use pin_project_lite::pin_project; +use tokio::time::sleep; mod client; mod context; @@ -248,7 +249,67 @@ impl Chat { })) }) } - Continuation::Timed { .. } => unimplemented!("Continuation::Timed"), + Continuation::Timed { continuation, timeout_ms } => { + let continuation_token = continuation.to_string(); + let timeout = Duration::from_millis(*timeout_ms as _); + + let events: Vec = contents + .live_chat_continuation + .actions + .unwrap_or_default() + .into_iter() + .filter_map(|act| ChatEvent::from_action(&act.action)) + .collect(); + + let _ = initial_continuation; + let _ = initial_continuation_bytes; + + Ok(Self { + initial_events: events, + stream: Box::pin(try_async_stream(move |yielder| async move { + let mut continuation_token = continuation_token; + let mut timeout = timeout; + loop { + sleep(timeout).await; + + let mut continuation = context + .client + .chat_live(GetLiveChatRequest { continuation: &continuation_token }) + .await? + .with_innertube_error() + .await? + .recv_all() + .await + .map_err(ChatError::Receive)?; + let continuation: GetLiveChatResponse<'_> = simd_json::from_slice(&mut continuation)?; + let Some(contents) = continuation.continuation_contents else { + break; + }; + + for event in contents + .live_chat_continuation + .actions + .unwrap_or_default() + .into_iter() + .filter_map(|act| ChatEvent::from_action(&act.action)) + { + yielder.r#yield(event).await; + } + + let Some(Continuation::Timed { continuation: next_token, timeout_ms }) = contents.live_chat_continuation.continuations.first() + else { + break; + }; + + continuation_token.clear(); + continuation_token.push_str(next_token); + + timeout = Duration::from_millis(*timeout_ms as _); + } + Ok(()) + })) + }) + } Continuation::PlayerSeek { .. } => unreachable!("PlayerSeek shouldn't be the first continuation") } } From 9c4c8fff3ebe2414444f463873c9de145b14a564 Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Sat, 13 Dec 2025 19:27:26 -0600 Subject: [PATCH 4/8] more robust action parsing --- src/youtube/mod.rs | 46 ++++++++------- src/youtube/signaler.rs | 8 +-- src/youtube/types/get_live_chat.rs | 89 +++++++++++++++++++++--------- 3 files changed, 93 insertions(+), 50 deletions(-) diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index 232490c..ca1eaab 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -17,6 +17,7 @@ use std::{error::Error as StdError, fmt, pin::Pin, task::Poll, time::Duration}; use async_stream_lite::try_async_stream; use futures_util::{Stream, StreamExt, pin_mut, stream::BoxStream}; use pin_project_lite::pin_project; +use simd_json::{BorrowedValue, derived::ValueTryAsObject}; use tokio::time::sleep; mod client; @@ -27,7 +28,7 @@ mod util; use self::{ client::ResponseExt, - signaler::SignalerChannel, + signaler::{SignalerChannel, SignalerError}, types::get_live_chat::{Continuation, GetLiveChatRequest, GetLiveChatResponse}, util::{TANGO_API_KEY, stringify_runs} }; @@ -40,7 +41,9 @@ pub use self::{ }, util::query_channel }; -use crate::youtube::signaler::SignalerError; + +#[derive(Debug)] +pub struct Author {} #[derive(Debug)] pub enum ChatEvent { @@ -48,10 +51,17 @@ pub enum ChatEvent { } impl ChatEvent { - pub(crate) fn from_action(action: &Action<'_>) -> Option { + pub(crate) fn from_action(action: BorrowedValue<'_>) -> Option { + let Ok(action) = simd_json::serde::from_refborrowed_value(&action) else { + let action_key = action.try_as_object().ok().and_then(|c| c.keys().next())?; + tracing::warn!("Encountered unknown or malformed action `{action_key}`"); + tracing::trace!("bad action: {}", simd_json::to_string(&action).as_deref().unwrap_or("")); + return None; + }; + match action { Action::AddChatItem { item, .. } => match item { - ChatItem::TextMessage { message_renderer_base, message } => message.as_ref().map(|x| ChatEvent::Message { text: stringify_runs(&x.runs) }), + ChatItem::TextMessage { base, message } => message.as_ref().map(|x| ChatEvent::Message { text: stringify_runs(&x.runs) }), _ => None }, Action::ReplayChat { .. } => unreachable!("ReplayChat should be collapsed"), @@ -105,9 +115,8 @@ impl Chat { let initial_events = contents .live_chat_continuation .actions - .unwrap_or_default() .into_iter() - .filter_map(|act| ChatEvent::from_action(&act.action)) + .filter_map(|act| ChatEvent::from_action(act.action)) .collect(); let _ = initial_continuation; let _ = initial_continuation_bytes; @@ -134,9 +143,8 @@ impl Chat { for event in contents .live_chat_continuation .actions - .unwrap_or_default() .into_iter() - .filter_map(|act| ChatEvent::from_action(&act.action)) + .filter_map(|act| ChatEvent::from_action(act.action)) { yielder.r#yield(event).await; } @@ -172,9 +180,8 @@ impl Chat { for event in contents .live_chat_continuation .actions - .unwrap_or_default() .into_iter() - .filter_map(|act| ChatEvent::from_action(&act.action)) + .filter_map(|act| ChatEvent::from_action(act.action)) { yielder.r#yield(event).await; } @@ -197,9 +204,8 @@ impl Chat { let events: Vec = contents .live_chat_continuation .actions - .unwrap_or_default() .into_iter() - .filter_map(|act| ChatEvent::from_action(&act.action)) + .filter_map(|act| ChatEvent::from_action(act.action)) .collect(); let _ = initial_continuation; @@ -229,10 +235,12 @@ impl Chat { break; }; - for action in contents.live_chat_continuation.actions.unwrap_or_default() { - if let Action::ReplayChat { actions, .. } = action.action { - events.extend(actions.into_iter().filter_map(|act| ChatEvent::from_action(&act.action))); - } + for action in contents.live_chat_continuation.actions { + let Ok(Action::ReplayChat { actions, .. }) = simd_json::serde::from_borrowed_value(action.action) else { + continue; + }; + + events.extend(actions.into_iter().filter_map(|act| ChatEvent::from_action(act.action))); } let Some(Continuation::Replay { continuation: next_token, .. }) = contents.live_chat_continuation.continuations.first() else { @@ -256,9 +264,8 @@ impl Chat { let events: Vec = contents .live_chat_continuation .actions - .unwrap_or_default() .into_iter() - .filter_map(|act| ChatEvent::from_action(&act.action)) + .filter_map(|act| ChatEvent::from_action(act.action)) .collect(); let _ = initial_continuation; @@ -289,9 +296,8 @@ impl Chat { for event in contents .live_chat_continuation .actions - .unwrap_or_default() .into_iter() - .filter_map(|act| ChatEvent::from_action(&act.action)) + .filter_map(|act| ChatEvent::from_action(act.action)) { yielder.r#yield(event).await; } diff --git a/src/youtube/signaler.rs b/src/youtube/signaler.rs index 6259396..54b94fb 100644 --- a/src/youtube/signaler.rs +++ b/src/youtube/signaler.rs @@ -192,11 +192,11 @@ impl SignalerChannel { break; }; - if event_id != "252" && event_id != "253" { - // 50, 51, and 53 are probably some internal stuff we don't care about. 252/253 seem to be correlated with new chat + if event_id != "252" && event_id != "253" && event_id != "254" { + // 50, 51, and 53 are probably some internal stuff we don't care about. 25x seem to be correlated with new chat // messages (though sometimes there aren't new chat messages at all and I'm not sure why). - // The channel starts off sending 252 but after a few seconds sends 253 instead. Not sure the difference between the - // two events, but they're both structured & function the same. + // The channel starts off sending 252 but after a few seconds sends 253 instead, and in higher volume streams gets up to + // 254. Not sure the difference between the events, but they're all structured & function the same. continue; } diff --git a/src/youtube/types/get_live_chat.rs b/src/youtube/types/get_live_chat.rs index 5aad6fa..d689752 100644 --- a/src/youtube/types/get_live_chat.rs +++ b/src/youtube/types/get_live_chat.rs @@ -24,14 +24,14 @@ pub struct GetLiveChatRequest<'s> { #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct GetLiveChatResponse<'s> { - #[serde(borrow)] + #[serde(bound = "GetLiveChatResponseContinuationContents<'s>: serde::Deserialize<'de>")] pub continuation_contents: Option> } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct GetLiveChatResponseContinuationContents<'s> { - #[serde(borrow)] + #[serde(bound = "LiveChatContinuation<'s>: serde::Deserialize<'de>")] pub live_chat_continuation: LiveChatContinuation<'s> } @@ -39,15 +39,19 @@ pub struct GetLiveChatResponseContinuationContents<'s> { pub struct LiveChatContinuation<'s> { #[serde(borrow)] pub continuations: Vec>, - #[serde(borrow)] - pub actions: Option>> + #[serde(default)] + #[serde(bound = "Vec>: serde::Deserialize<'de>")] + pub actions: Vec> } #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct ActionContainer<'s> { - #[serde(borrow, flatten)] - pub action: Action<'s> + #[serde(rename = "clickTrackingParams")] + _tracking: Option<&'s str>, + #[serde(flatten)] + #[serde(bound = "simd_json::BorrowedValue<'s>: serde::Deserialize<'de>")] + pub action: simd_json::BorrowedValue<'s> } #[derive(Deserialize, Debug)] @@ -89,6 +93,13 @@ pub enum Action<'s> { item: ChatItem<'s>, client_id: Option<&'s str> }, + #[serde(rename = "addLiveChatTickerItemAction")] + #[serde(rename_all = "camelCase")] + AddTickerItem { + #[serde(flatten)] + #[serde(bound(deserialize = "simd_json::BorrowedValue<'s>: serde::Deserialize<'de>"))] + data: simd_json::BorrowedValue<'s> + }, #[serde(rename = "replaceChatItemAction")] #[serde(rename_all = "camelCase")] ReplaceChatItem { @@ -114,10 +125,15 @@ pub enum Action<'s> { #[serde(rename_all = "camelCase")] AddBannerToLiveChat { #[serde(flatten)] - data: simd_json::OwnedValue + #[serde(bound(deserialize = "simd_json::BorrowedValue<'s>: serde::Deserialize<'de>"))] + data: simd_json::BorrowedValue<'s> }, #[serde(rename = "liveChatReportModerationStateCommand")] - ReportModerationState(simd_json::OwnedValue) + ReportModerationState { + #[serde(flatten)] + #[serde(bound(deserialize = "simd_json::BorrowedValue<'s>: serde::Deserialize<'de>"))] + data: simd_json::BorrowedValue<'s> + } } #[derive(Deserialize, Debug, Clone)] @@ -161,7 +177,7 @@ pub enum ChatItem<'s> { #[serde(rename_all = "camelCase")] TextMessage { #[serde(borrow, flatten)] - message_renderer_base: MessageRendererBase<'s>, + base: MessageRendererBase<'s>, #[serde(borrow)] message: Option> }, @@ -169,7 +185,7 @@ pub enum ChatItem<'s> { #[serde(rename_all = "camelCase")] Superchat { #[serde(borrow, flatten)] - message_renderer_base: MessageRendererBase<'s>, + base: MessageRendererBase<'s>, #[serde(borrow)] message: Option>, #[serde(borrow)] @@ -184,7 +200,7 @@ pub enum ChatItem<'s> { #[serde(rename_all = "camelCase")] MembershipItem { #[serde(borrow, flatten)] - message_renderer_base: MessageRendererBase<'s>, + base: MessageRendererBase<'s>, #[serde(borrow)] header_sub_text: Option>, #[serde(borrow)] @@ -194,7 +210,7 @@ pub enum ChatItem<'s> { #[serde(rename_all = "camelCase")] PaidSticker { #[serde(borrow, flatten)] - message_renderer_base: MessageRendererBase<'s>, + base: MessageRendererBase<'s>, #[serde(borrow)] purchase_amount_text: UnlocalizedText<'s>, #[serde(borrow)] @@ -209,42 +225,63 @@ pub enum ChatItem<'s> { #[serde(rename = "liveChatSponsorshipsGiftPurchaseAnnouncementRenderer")] #[serde(rename_all = "camelCase")] MembershipGift { - id: String, - #[serde(flatten)] - data: simd_json::OwnedValue + id: &'s str, + #[serde(deserialize_with = "deserialize_number_from_string")] + timestamp_usec: i64, + author_external_channel_id: &'s str, + #[serde(borrow)] + header: ChatItemHeader<'s> }, #[serde(rename = "liveChatSponsorshipsGiftRedemptionAnnouncementRenderer")] #[serde(rename_all = "camelCase")] MembershipGiftRedemption { - id: String, - #[serde(flatten)] - data: simd_json::OwnedValue + #[serde(borrow, flatten)] + base: MessageRendererBase<'s>, + #[serde(borrow)] + message: Option> }, #[serde(rename = "liveChatPlaceholderItemRenderer")] #[serde(rename_all = "camelCase")] Placeholder { - id: String, + id: &'s str, #[serde(deserialize_with = "deserialize_number_from_string")] timestamp_usec: i64 }, #[serde(rename = "liveChatViewerEngagementMessageRenderer")] - ViewerEngagement { id: String }, + ViewerEngagement { id: &'s str }, #[serde(untagged)] - Unknown(simd_json::OwnedValue) + Unknown(#[serde(bound(deserialize = "simd_json::BorrowedValue<'s>: serde::Deserialize<'de>"))] simd_json::BorrowedValue<'s>) } impl ChatItem<'_> { pub fn id(&self) -> &str { match self { - ChatItem::MembershipItem { message_renderer_base, .. } => message_renderer_base.id, - ChatItem::PaidSticker { message_renderer_base, .. } => message_renderer_base.id, - ChatItem::Superchat { message_renderer_base, .. } => message_renderer_base.id, - ChatItem::TextMessage { message_renderer_base, .. } => message_renderer_base.id, + ChatItem::MembershipItem { base, .. } => base.id, + ChatItem::PaidSticker { base, .. } => base.id, + ChatItem::Superchat { base, .. } => base.id, + ChatItem::TextMessage { base, .. } => base.id, ChatItem::MembershipGift { id, .. } => id, - ChatItem::MembershipGiftRedemption { id, .. } => id, + ChatItem::MembershipGiftRedemption { base, .. } => base.id, ChatItem::Placeholder { id, .. } => id, ChatItem::ViewerEngagement { id } => id, ChatItem::Unknown(_) => "" } } } + +#[derive(Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub enum ChatItemHeader<'s> { + #[serde(rename = "liveChatSponsorshipsHeaderRenderer")] + #[serde(rename_all = "camelCase")] + Sponsorship { + #[serde(borrow)] + author_name: Option>, + #[serde(borrow)] + author_photo: ImageContainer<'s>, + #[serde(borrow)] + author_badges: Option>>, + #[serde(borrow)] + primary_text: LocalizedText<'s> + } +} From 803e7fba685ac8d3fe8022931e3d64e69b8f708a Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Sat, 14 Feb 2026 17:36:59 -0600 Subject: [PATCH 5/8] implement all message types --- examples/youtube.rs | 68 ++++++-- src/youtube/mod.rs | 250 ++++++++++++++++++++++++++++- src/youtube/types/get_live_chat.rs | 12 +- src/youtube/types/mod.rs | 34 +--- 4 files changed, 306 insertions(+), 58 deletions(-) diff --git a/examples/youtube.rs b/examples/youtube.rs index ab7c411..6bcd5d2 100644 --- a/examples/youtube.rs +++ b/examples/youtube.rs @@ -12,27 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::env::args; +use std::{env::args, fmt::Write}; -use brainrot::youtube::{self, ChatEvent, RequestExecutor, Response, StreamChatMode, StreamContext}; +use brainrot::youtube::{self, ChatEvent, MembershipRedemption, RequestExecutor, Run, StreamChatMode, StreamContext, StreamStatus}; use futures_util::StreamExt; #[derive(Debug, Default)] struct ReqwestExecutor(reqwest::Client); impl RequestExecutor for ReqwestExecutor { - type Response = Respownse; + type Response = ReqwestResponse; type Error = reqwest::Error; async fn make_request(&self, req: http::Request) -> Result { - self.0.execute(req.try_into().unwrap()).await.map(Respownse) + self.0.execute(req.try_into().unwrap()).await.map(ReqwestResponse) } } #[derive(Debug)] -struct Respownse(reqwest::Response); +struct ReqwestResponse(reqwest::Response); -impl Response for Respownse { +impl youtube::Response for ReqwestResponse { type Error = reqwest::Error; fn status_code(&self) -> u16 { @@ -46,23 +46,63 @@ impl Response for Respownse { #[tokio::main] async fn main() -> anyhow::Result<()> { + let Some(channel_id) = args().nth(1) else { + anyhow::bail!("cargo run --example youtube -- "); + }; + let client = youtube::Client::::default(); - let streams = youtube::query_channel(args().nth(1).as_deref().unwrap_or("@miyukiwei"), &client).await?; + let streams = youtube::query_channel(&channel_id, &client).await?; + + let Some(stream) = streams.iter().find(|stream| stream.status() == StreamStatus::Live) else { + eprintln!("Channel has no live streams right now"); + return Ok(()); + }; - let context = StreamContext::new(client, streams[0].id(), StreamChatMode::Live).await?; + println!("Viewing {} (https://www.youtube.com/watch?v={})", stream.title(), stream.id()); + println!("{}", "=".repeat(80)); + + let context = StreamContext::new(client, stream.id(), StreamChatMode::Live).await?; let mut chat = youtube::Chat::new(context).await?; for event in chat.initial_events() { - match event { - ChatEvent::Message { text } => println!("{text}") - } + print_event(event); } while let Some(event) = chat.next().await.transpose()? { - match event { - ChatEvent::Message { text } => println!("{text}") - } + print_event(event); } Ok(()) } + +fn stringify_runs(runs: &[Run]) -> String { + let mut s = String::new(); + for run in runs { + write!(&mut s, "{run}").unwrap(); + } + s +} + +fn print_event(event: ChatEvent) { + match event { + ChatEvent::Message { author, contents, superchat, .. } => { + let text = stringify_runs(&contents); + if let Some(superchat) = superchat { + println!("{} sent {}: {}", author.name.unwrap_or(author.id), superchat.amount, text); + } else { + println!("{}: {}", author.name.unwrap_or(author.id), text); + } + } + ChatEvent::Membership { user, contents, redemption_type, .. } => { + println!( + "Membership for {}: {}{}", + user.name.unwrap_or(user.id), + stringify_runs(&contents), + if redemption_type == MembershipRedemption::Gift { " (gifted)" } else { "" } + ); + } + ChatEvent::MembershipGift { gifter, contents, .. } => { + println!("{} gifted memberships: {}", gifter.name.unwrap_or(gifter.id), stringify_runs(&contents)); + } + } +} diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index ca1eaab..a7c05a9 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -29,8 +29,8 @@ mod util; use self::{ client::ResponseExt, signaler::{SignalerChannel, SignalerError}, - types::get_live_chat::{Continuation, GetLiveChatRequest, GetLiveChatResponse}, - util::{TANGO_API_KEY, stringify_runs} + types::get_live_chat::{ChatItemHeader, Continuation, GetLiveChatRequest, GetLiveChatResponse}, + util::TANGO_API_KEY }; pub use self::{ client::{Client, ClientError, InnertubeError, RequestExecutor, Response}, @@ -39,15 +39,169 @@ pub use self::{ ImageContainer, LocalizedRun, LocalizedText, Thumbnail, UnlocalizedText, get_live_chat::{Action, ChatItem, MessageRendererBase} }, - util::query_channel + util::{ChannelStream, QueryChannelError, StreamStatus, query_channel} }; -#[derive(Debug)] -pub struct Author {} +#[derive(Debug, Clone)] +pub struct Image { + pub url: String, + pub size: Option<(u32, u32)> +} -#[derive(Debug)] +#[derive(Debug, Clone)] +pub struct AuthorBadge { + pub name: String, + pub icon: Vec, + pub icon_type: Option +} + +#[derive(Debug, Clone)] +pub struct Author { + pub id: String, + pub name: Option, + pub avatars: Vec, + pub badges: Vec +} + +impl Author { + pub(crate) fn from_message_base(base: &types::get_live_chat::MessageRendererBase) -> Self { + Author { + id: base.author_external_channel_id.to_string(), + name: base.author_name.as_ref().map(|text| text.simple_text.to_string()), + avatars: base + .author_photo + .thumbnails + .iter() + .map(|thumb| Image { + url: thumb.url.to_string(), + size: match (thumb.width, thumb.height) { + (Some(width), Some(height)) => Some((width as u32, height as u32)), + _ => None + } + }) + .collect(), + badges: base + .author_badges + .iter() + .map(|badge| AuthorBadge { + name: badge.live_chat_author_badge_renderer.tooltip.to_string(), + icon_type: badge.live_chat_author_badge_renderer.icon.as_ref().map(|icon| icon.icon_type.to_string()), + icon: badge + .live_chat_author_badge_renderer + .custom_thumbnail + .as_ref() + .map(|img| { + img.thumbnails + .iter() + .map(|thumb| Image { + url: thumb.url.to_string(), + size: match (thumb.width, thumb.height) { + (Some(width), Some(height)) => Some((width as u32, height as u32)), + _ => None + } + }) + .collect() + }) + .unwrap_or_default() + }) + .collect() + } + } +} + +#[derive(Debug, Clone)] +pub struct SuperchatMeta { + pub amount: String, + pub header_background_color: u32, + pub header_text_color: u32, + pub body_background_color: u32, + pub body_text_color: u32, + pub author_name_text_color: u32 +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MembershipRedemption { + /// Membership was purchased by user + Purchase, + /// Membership was redeemed from a gift + Gift +} + +#[derive(Debug, Clone)] +pub enum Run { + Text(String), + Emoji { name: String, id: String, images: Vec } +} + +impl Run { + pub(crate) fn from_localized_run(run: &LocalizedRun) -> Self { + match run { + LocalizedRun::Text { text } => Run::Text(text.to_string()), + LocalizedRun::Emoji { emoji, .. } => { + let label = emoji + .image + .accessibility + .as_ref() + .expect("emojis should always have accessibility data") + .accessibility_data + .label + .to_string(); + if emoji.is_custom_emoji { + Run::Emoji { + name: label, + id: emoji.emoji_id.to_string(), + images: emoji + .image + .thumbnails + .iter() + .map(|thumb| Image { + url: thumb.url.to_string(), + size: match (thumb.width, thumb.height) { + (Some(width), Some(height)) => Some((width as u32, height as u32)), + _ => None + } + }) + .collect() + } + } else { + Run::Text(label) + } + } + } + } +} + +impl fmt::Display for Run { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Run::Text(text) => f.write_str(text), + Run::Emoji { name, .. } => f.write_fmt(format_args!(":{name}:")) + } + } +} + +#[derive(Debug, Clone)] pub enum ChatEvent { - Message { text: String } + Message { + id: String, + author: Author, + contents: Vec, + timestamp_ms: i64, + superchat: Option + }, + Membership { + id: String, + user: Author, + contents: Vec, + timestamp_ms: i64, + redemption_type: MembershipRedemption + }, + MembershipGift { + id: String, + gifter: Author, + contents: Vec, + timestamp_ms: i64 + } } impl ChatEvent { @@ -61,7 +215,87 @@ impl ChatEvent { match action { Action::AddChatItem { item, .. } => match item { - ChatItem::TextMessage { base, message } => message.as_ref().map(|x| ChatEvent::Message { text: stringify_runs(&x.runs) }), + ChatItem::TextMessage { base, message } => Some(ChatEvent::Message { + id: base.id.to_string(), + author: Author::from_message_base(&base), + contents: message + .as_ref() + .map(|text| text.runs.iter().map(Run::from_localized_run).collect()) + .unwrap_or_default(), + timestamp_ms: base.timestamp_usec / 1000, + superchat: None + }), + ChatItem::Superchat { + base, + message, + purchase_amount_text, + header_background_color, + header_text_color, + body_background_color, + body_text_color, + author_name_text_color + } => Some(ChatEvent::Message { + id: base.id.to_string(), + author: Author::from_message_base(&base), + contents: message + .as_ref() + .map(|text| text.runs.iter().map(Run::from_localized_run).collect()) + .unwrap_or_default(), + timestamp_ms: base.timestamp_usec / 1000, + superchat: Some(SuperchatMeta { + amount: purchase_amount_text.simple_text.to_string(), + header_background_color: header_background_color as _, + author_name_text_color: author_name_text_color as _, + body_background_color: body_background_color as _, + body_text_color: body_text_color as _, + header_text_color: header_text_color as _ + }) + }), + ChatItem::MembershipItem { base, header_sub_text } => Some(ChatEvent::Membership { + id: base.id.to_string(), + user: Author::from_message_base(&base), + contents: header_sub_text + .as_ref() + .map(|text| text.runs.iter().map(Run::from_localized_run).collect()) + .unwrap_or_default(), + timestamp_ms: base.timestamp_usec / 1000, + redemption_type: MembershipRedemption::Purchase + }), + ChatItem::MembershipGiftRedemption { base, message } => Some(ChatEvent::Membership { + id: base.id.to_string(), + user: Author::from_message_base(&base), + contents: message + .as_ref() + .map(|text| text.runs.iter().map(Run::from_localized_run).collect()) + .unwrap_or_default(), + timestamp_ms: base.timestamp_usec / 1000, + redemption_type: MembershipRedemption::Gift + }), + ChatItem::MembershipGift { + id, + timestamp_usec, + author_external_channel_id, + header + } => match header { + ChatItemHeader::Sponsorship { + author_name, + author_photo, + author_badges, + primary_text + } => Some(ChatEvent::MembershipGift { + id: id.to_string(), + gifter: Author::from_message_base(&MessageRendererBase { + id: author_external_channel_id, + author_name, + author_photo, + author_badges, + timestamp_usec, + author_external_channel_id + }), + contents: primary_text.runs.iter().map(Run::from_localized_run).collect(), + timestamp_ms: timestamp_usec / 1000 + }) + }, _ => None }, Action::ReplayChat { .. } => unreachable!("ReplayChat should be collapsed"), diff --git a/src/youtube/types/get_live_chat.rs b/src/youtube/types/get_live_chat.rs index d689752..b93fde7 100644 --- a/src/youtube/types/get_live_chat.rs +++ b/src/youtube/types/get_live_chat.rs @@ -163,8 +163,8 @@ pub struct MessageRendererBase<'s> { pub author_name: Option>, #[serde(borrow)] pub author_photo: ImageContainer<'s>, - #[serde(borrow)] - pub author_badges: Option>>, + #[serde(borrow, default)] + pub author_badges: Vec>, #[serde(deserialize_with = "deserialize_number_from_string")] pub timestamp_usec: i64, pub author_external_channel_id: &'s str @@ -202,9 +202,7 @@ pub enum ChatItem<'s> { #[serde(borrow, flatten)] base: MessageRendererBase<'s>, #[serde(borrow)] - header_sub_text: Option>, - #[serde(borrow)] - author_badges: Option>> + header_sub_text: Option> }, #[serde(rename = "liveChatPaidStickerRenderer")] #[serde(rename_all = "camelCase")] @@ -279,8 +277,8 @@ pub enum ChatItemHeader<'s> { author_name: Option>, #[serde(borrow)] author_photo: ImageContainer<'s>, - #[serde(borrow)] - author_badges: Option>>, + #[serde(borrow, default)] + author_badges: Vec>, #[serde(borrow)] primary_text: LocalizedText<'s> } diff --git a/src/youtube/types/mod.rs b/src/youtube/types/mod.rs index dfd26af..df67c0e 100644 --- a/src/youtube/types/mod.rs +++ b/src/youtube/types/mod.rs @@ -50,34 +50,12 @@ pub struct InnertubeRequestContextClient<'c> { pub client_name: &'c str } -#[derive(Deserialize, Debug)] -#[serde(untagged, rename_all = "camelCase")] -pub enum InnertubeResponse<'s, T> { - Error { - #[serde(borrow)] - error: InnertubeError<'s> - }, - Success { - #[serde(flatten)] - data: T - } -} - #[derive(Deserialize, Debug)] pub struct InnertubeError<'s> { - pub code: u16, pub message: &'s str, - pub errors: Vec>, pub status: &'s str } -#[derive(Deserialize, Debug)] -pub struct InnertubeErrorDetail<'s> { - pub message: &'s str, - pub domain: &'s str, - pub reason: &'s str -} - #[derive(Deserialize, Default, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct UnlocalizedText<'s> { @@ -112,11 +90,7 @@ impl fmt::Display for LocalizedRun<'_> { .expect("emojis should always have accessibility data") .accessibility_data .label; - if let Some(true) = emoji.is_custom_emoji { - f.write_fmt(format_args!(":{label}:")) - } else { - f.write_str(label) - } + if emoji.is_custom_emoji { f.write_fmt(format_args!(":{label}:")) } else { f.write_str(label) } } } } @@ -163,9 +137,11 @@ pub struct Emoji<'s> { pub shortcuts: Option>, #[serde(borrow)] pub search_terms: Option>, - pub supports_skin_tone: Option, + #[serde(default)] + pub supports_skin_tone: bool, pub image: ImageContainer<'s>, - pub is_custom_emoji: Option + #[serde(default)] + pub is_custom_emoji: bool } #[derive(Deserialize, Debug, Clone)] From a1327d16f9ac7040941435aa69fadc2ec80d8a4e Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Sun, 22 Feb 2026 00:28:02 -0600 Subject: [PATCH 6/8] remove multicast --- README.md | 1 - examples/twitch.rs | 6 +-- src/lib.rs | 8 ---- src/multicast.rs | 115 --------------------------------------------- 4 files changed, 3 insertions(+), 127 deletions(-) delete mode 100644 src/multicast.rs diff --git a/README.md b/README.md index a28057b..358b025 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,6 @@ A live chat interface for Twitch & YouTube written in Rust. * ⚡ Low latency * ⏪ Supports VODs * 🔓 No authentication required -- 🤝 **Simulcast** - Receive from multiple streams & platforms simultaneously ## Usage See [`examples/twitch.rs`](https://github.com/vitri-ent/brainrot/blob/main/examples/twitch.rs) & [`examples/youtube.rs`](https://github.com/vitri-ent/brainrot/blob/main/examples/youtube.rs). diff --git a/examples/twitch.rs b/examples/twitch.rs index 37d4fe8..a45f0ea 100644 --- a/examples/twitch.rs +++ b/examples/twitch.rs @@ -14,15 +14,15 @@ use std::env::args; -use brainrot::{TwitchChat, TwitchChatEvent, twitch}; +use brainrot::twitch::{Anonymous, Chat, ChatEvent}; use futures_util::StreamExt; #[tokio::main] async fn main() -> anyhow::Result<()> { - let mut client = TwitchChat::new(args().nth(1).as_deref().unwrap_or("miyukiwei"), twitch::Anonymous).await?; + let mut client = Chat::new(args().nth(1).as_deref().unwrap_or("miyukiwei"), Anonymous).await?; while let Some(message) = client.next().await.transpose()? { - if let TwitchChatEvent::Message { user, contents, .. } = message { + if let ChatEvent::Message { user, contents, .. } = message { println!("{}: {}", user.display_name, contents.iter().map(|c| c.to_string()).collect::()); } } diff --git a/src/lib.rs b/src/lib.rs index 2ef45ef..71daa43 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,15 +16,7 @@ #[cfg(feature = "twitch")] pub mod twitch; -#[cfg(feature = "twitch")] -pub use self::twitch::{Chat as TwitchChat, ChatEvent as TwitchChatEvent, MessageSegment as TwitchMessageSegment, TwitchIdentity}; - #[cfg(feature = "youtube")] pub mod youtube; -// #[cfg(all(feature = "twitch", feature = "youtube"))] -// mod multicast; -// #[cfg(all(feature = "twitch", feature = "youtube"))] -// pub use self::multicast::{Multicast, MulticastError, VariantChat}; - pub(crate) mod util; diff --git a/src/multicast.rs b/src/multicast.rs deleted file mode 100644 index bad1578..0000000 --- a/src/multicast.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2024 pyke.io -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{pin::Pin, task::Poll}; - -use futures_util::Stream; -use pin_project_lite::pin_project; -use thiserror::Error; - -use crate::{twitch, youtube}; - -#[derive(Debug, Error)] -pub enum MulticastError { - #[error("{0}")] - TwitchError(irc::error::Error), - #[error("{0}")] - YouTubeError(youtube::Error) -} - -#[derive(Debug)] -pub enum VariantChat { - Twitch(twitch::ChatEvent), - YouTube(youtube::ChatEvent) -} - -pin_project! { - #[project = VariantStreamProject] - enum VariantStream { - Twitch { #[pin] x: crate::twitch::Chat }, - YouTube { #[pin] x: crate::youtube::Chat } - } -} - -impl Stream for VariantStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - match self.project() { - VariantStreamProject::YouTube { x } => { - Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(VariantChat::YouTube).map_err(MulticastError::YouTubeError))) - } - VariantStreamProject::Twitch { x } => { - Poll::Ready(futures_util::ready!(x.poll_next(cx)).map(|x| x.map(VariantChat::Twitch).map_err(MulticastError::TwitchError))) - } - } - } -} - -impl From for VariantStream { - fn from(value: twitch::Chat) -> Self { - Self::Twitch { x: value } - } -} - -impl From for VariantStream { - fn from(value: youtube::Chat) -> Self { - Self::YouTube { x: value } - } -} - -pin_project! { - pub struct Multicast { - #[pin] - streams: Vec - } -} - -impl Multicast { - pub fn new() -> Self { - Self { streams: vec![] } - } - - pub fn push(&mut self, stream: impl Into) { - self.streams.push(stream.into()); - } - - pub async fn push_twitch(&mut self, channel: &str, auth: impl twitch::TwitchIdentity) -> Result<(), irc::error::Error> { - self.push(twitch::Chat::new(channel, auth).await?); - Ok(()) - } - - pub async fn push_youtube(&mut self, context: youtube::StreamContext) -> Result<(), youtube::Error> { - self.push(youtube::Chat::new(context).await?); - Ok(()) - } -} - -impl Stream for Multicast { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { - let mut this = self.project(); - let mut res = Poll::Ready(None); - for i in 0..this.streams.len() { - let stream = unsafe { Pin::new_unchecked(this.streams.as_mut().get_unchecked_mut().get_mut(i).unwrap()) }; - match stream.poll_next(cx) { - Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => continue, - Poll::Pending => res = Poll::Pending - } - } - res - } -} From 09a0b50706e0d6d14a1307210e37df5dee9795e2 Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Sun, 22 Feb 2026 00:30:04 -0600 Subject: [PATCH 7/8] remove tokio dependency --- Cargo.toml | 1 - examples/youtube.rs | 4 ++++ src/youtube/client.rs | 4 +++- src/youtube/mod.rs | 3 +-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d1e396..bf667f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ rust-version = "1.89" [dependencies] tracing = { version = "0.1", default-features = false, features = [ "std" ] } irc = { version = "1.0", optional = true, default-features = false } -tokio = { version = "1.42", default-features = false, features = [ "net" ] } futures-util = { version = "0.3", default-features = false, features = [ "std" ] } thiserror = "2.0" serde = { version = "1.0", optional = true, features = [ "derive" ] } diff --git a/examples/youtube.rs b/examples/youtube.rs index 6bcd5d2..91ada34 100644 --- a/examples/youtube.rs +++ b/examples/youtube.rs @@ -27,6 +27,10 @@ impl RequestExecutor for ReqwestExecutor { async fn make_request(&self, req: http::Request) -> Result { self.0.execute(req.try_into().unwrap()).await.map(ReqwestResponse) } + + async fn sleep(dur: std::time::Duration) { + tokio::time::sleep(dur).await + } } #[derive(Debug)] diff --git a/src/youtube/client.rs b/src/youtube/client.rs index 51e046e..ec998a4 100644 --- a/src/youtube/client.rs +++ b/src/youtube/client.rs @@ -1,4 +1,4 @@ -use std::{error::Error as StdError, fmt, future::Future, sync::OnceLock}; +use std::{error::Error as StdError, fmt, future::Future, sync::OnceLock, time::Duration}; use bytes::{Bytes, BytesMut}; use http::{HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, header, request::Builder as RequestBuilder, uri::PathAndQuery}; @@ -90,6 +90,8 @@ pub trait RequestExecutor: Send + Sync + 'static { type Error: StdError + Send; fn make_request(&self, req: http::Request) -> impl Future> + Send + Sync + '_; + + fn sleep(dur: Duration) -> impl Future + Send + Sync; } #[derive(Debug)] diff --git a/src/youtube/mod.rs b/src/youtube/mod.rs index a7c05a9..97ad440 100644 --- a/src/youtube/mod.rs +++ b/src/youtube/mod.rs @@ -18,7 +18,6 @@ use async_stream_lite::try_async_stream; use futures_util::{Stream, StreamExt, pin_mut, stream::BoxStream}; use pin_project_lite::pin_project; use simd_json::{BorrowedValue, derived::ValueTryAsObject}; -use tokio::time::sleep; mod client; mod context; @@ -511,7 +510,7 @@ impl Chat { let mut continuation_token = continuation_token; let mut timeout = timeout; loop { - sleep(timeout).await; + E::sleep(timeout).await; let mut continuation = context .client From 1c1fea4c41b4cfd4136043c063274c47aa36bd27 Mon Sep 17 00:00:00 2001 From: "Carson M." Date: Sun, 22 Feb 2026 00:35:47 -0600 Subject: [PATCH 8/8] cleanup --- src/twitch/event.rs | 4 ++-- src/youtube/client.rs | 2 -- src/youtube/types/browse.rs | 2 -- src/youtube/types/get_live_chat.rs | 1 - src/youtube/util.rs | 1 - 5 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/twitch/event.rs b/src/twitch/event.rs index cfd84ad..8957ffb 100644 --- a/src/twitch/event.rs +++ b/src/twitch/event.rs @@ -121,7 +121,7 @@ pub(crate) fn to_chat_event(message: irc::proto::Message) -> Option { .tags? .into_iter() .filter(|c| c.1.is_some()) - .map(|c| (c.0, c.1.unwrap())) + .map(|c| (c.0, c.1.expect("infallible"))) .collect::>(); let (username, user_display_name) = match message.prefix? { @@ -180,7 +180,7 @@ pub(crate) fn to_chat_event(message: irc::proto::Message) -> Option { emotes.push((id.to_owned(), from, to)); } } - emotes.sort_by(|a, b| a.1.cmp(&b.1)); + emotes.sort_by_key(|a| a.1); let mut segments = Vec::with_capacity(emotes.len()); if !emotes.is_empty() { diff --git a/src/youtube/client.rs b/src/youtube/client.rs index ec998a4..41a970a 100644 --- a/src/youtube/client.rs +++ b/src/youtube/client.rs @@ -188,8 +188,6 @@ impl Client { ); headers.append(HeaderName::from_static("x-youtube-client-version"), HeaderValue::from_str(context.client_version).expect("Invalid client version")); - // Set our Accept-Language to en-US so we can properly match substrings - headers.append(header::ACCEPT_LANGUAGE, HeaderValue::from_static("en-US,en;q=0.5")); headers.append(header::USER_AGENT, HeaderValue::from_static("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:138.0) Gecko/20100101 Firefox/138.0")); // Referer is required by Signaler endpoints. headers.append(header::REFERER, HeaderValue::from_static("https://www.youtube.com/")); diff --git a/src/youtube/types/browse.rs b/src/youtube/types/browse.rs index 67a2d97..0db2da4 100644 --- a/src/youtube/types/browse.rs +++ b/src/youtube/types/browse.rs @@ -84,8 +84,6 @@ pub enum RichItemContent<'s> { VideoRenderer { #[serde(borrow)] title: LocalizedText<'s>, - // #[serde(borrow)] - // thumbnail: ImageContainer<'s>, #[serde(borrow)] thumbnail_overlays: Vec>, video_id: &'s str, diff --git a/src/youtube/types/get_live_chat.rs b/src/youtube/types/get_live_chat.rs index b93fde7..1d3117c 100644 --- a/src/youtube/types/get_live_chat.rs +++ b/src/youtube/types/get_live_chat.rs @@ -62,7 +62,6 @@ pub enum Continuation<'s> { Invalidation { #[serde(borrow)] invalidation_id: InvalidationId<'s>, - // timeout_ms: usize, continuation: &'s str }, #[serde(rename = "timedContinuationData")] diff --git a/src/youtube/util.rs b/src/youtube/util.rs index e85e871..e418ff1 100644 --- a/src/youtube/util.rs +++ b/src/youtube/util.rs @@ -127,7 +127,6 @@ pub async fn query_channel(channel_id: &str, client: &Client let video_id = video_id.to_string(); let title = stringify_runs(&title.runs); - // let thumbnail = thumbnail.thumbnails.last().map(|c| c.url); let thumbnail = format!("https://i.ytimg.com/vi/{video_id}/maxresdefault.jpg"); // 1280x720, innertube only gives us 336x118 at most match time_status {