From 35b36ef481e84bf9abbf78fc6a5656ff9e50ffc7 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 12 Jan 2026 17:03:24 +0300 Subject: [PATCH] Analysis streaming --- locales/analyze.yml | 16 +++++++- src/telegram/actions/analyze.rs | 73 +++++++++++++++++++++++++++++++-- 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/locales/analyze.yml b/locales/analyze.yml index 5a8ec497..490dcdb8 100644 --- a/locales/analyze.yml +++ b/locales/analyze.yml @@ -77,12 +77,24 @@ analysis.waiting: 🎵 %{track_name} Album: %{album_name} - ⏳ Wait for analysis to finish 🔍 + ⏳ Wait for analysis to start 🔍 ru: |- 🎵 %{track_name} Альбом: %{album_name} - ⏳ Ждите завершения анализа 🔍 + ⏳ Ждите старта анализа 🔍 + +analysis.partial-result: + en: |- + 🎵 %{track_name} + Album: %{album_name} + + %{analysis_result} ⏳ + ru: |- + 🎵 %{track_name} + Альбом: %{album_name} + + %{analysis_result} ⏳ analysis.failed: en: |- diff --git a/src/telegram/actions/analyze.rs b/src/telegram/actions/analyze.rs index 70482ad9..2c659b68 100644 --- a/src/telegram/actions/analyze.rs +++ b/src/telegram/actions/analyze.rs @@ -3,12 +3,14 @@ use async_openai::types::chat::{ CreateChatCompletionRequestArgs, }; use backon::{ExponentialBuilder, Retryable}; +use futures::StreamExt; use itertools::Itertools; use rspotify::model::TrackId; use teloxide::payloads::{AnswerCallbackQuerySetters as _, EditMessageTextSetters as _}; use teloxide::prelude::Requester as _; use teloxide::sugar::bot::BotMessagesExt as _; use teloxide::types::{CallbackQuery, InlineKeyboardMarkup, Message, ParseMode}; +use tokio::time::{Duration, Instant}; use crate::app::{AIConfig, App}; use crate::profanity; @@ -162,13 +164,76 @@ async fn perform( .messages([ChatCompletionRequestUserMessage::from(prompt.as_ref()).into()]) .build()?; - let response = config.openai_client().chat().create(request).await?; + let analysis_result = { + const UPDATE_INTERVAL: Duration = Duration::from_millis(500); - let choices = response.choices.first(); + let mut stream = config.openai_client().chat().create_stream(request).await?; + let mut last_update = Instant::now(); + let mut accumulator = String::new(); - let Some(choice) = choices else { return Ok(()) }; + // Streaming loop + while let Some(response) = stream.next().await { + let response = response?; - let analysis_result = choice.message.content.clone().unwrap_or_default(); + // Start takes some time. So it's better to initialize on first token + if accumulator.is_empty() { + last_update = Instant::now(); + } + + if let Some(choice) = response.choices.first() + && let Some(delta_content) = &choice.delta.content + { + accumulator.push_str(delta_content); + } + + let should_update = last_update.elapsed() >= UPDATE_INTERVAL; + + if should_update && !accumulator.is_empty() { + let mut display_text = t!( + "analysis.partial-result", + locale = state.locale(), + track_name = track.track_tg_link(), + album_name = track.album_tg_link(), + analysis_result = accumulator, + ); + + let mut hit_limit = false; + + if display_text.chars_len() > MESSAGE_MAX_LEN { + let template_overhead = display_text.chars_len() - accumulator.chars_len(); + let max_analysis_len = MESSAGE_MAX_LEN - template_overhead; + + display_text = t!( + "analysis.partial-result", + locale = state.locale(), + track_name = track.track_tg_link(), + album_name = track.album_tg_link(), + analysis_result = accumulator.chars_crop(max_analysis_len), + ); + + hit_limit = true; + } + + app.bot() + .edit_text(message, display_text) + .link_preview_options(link_preview_small_top(track.url())) + .parse_mode(ParseMode::Html) + .await?; + + if hit_limit { + break; + } + + last_update = Instant::now(); + } + } + + if accumulator.is_empty() { + anyhow::bail!("No analysis content received"); + } + + accumulator + }; let status = TrackStatusService::get_status(app.db(), state.user_id(), track.id()).await; let keyboard = InlineButtons::from_track_status(status, track.id(), state.locale());