feat: add bidirectional live streaming to Runner and LlmAgent#325
Open
tSte wants to merge 8 commits into
Open
Conversation
Implements Runner.runLive plus the underlying LlmAgent and connection plumbing so audio/video bidi sessions work end-to-end, bringing parity with adk-python's run_live. - Runner.runLive drives a live invocation via a LiveRequestQueue, defaults responseModalities to AUDIO, auto-enables transcription configs for multi-agent setups, and runs the plugin lifecycle. - LlmAgent.runLiveFlow runs the same preprocessors as runAsync, opens llm.connect, drains the live request queue on a parallel send loop, yields events from the receive loop, ferries function responses back to the open websocket, and recurses on transfer_to_agent. - GeminiLlmConnection bridges the GenAI Session callbacks into an AsyncGenerator, aggregates text, transcriptions, tool calls, session resumption updates, turn-complete and interruption signals. - BaseLlmConnection gains optional sendActivityStart/sendActivityEnd for manual activity boundary signalling. - BaseAgent.runLive now mirrors runAsync (before/after callbacks, abort handling, otel span) and dispatches to runLiveImpl. - Live model audio events with inline data are yielded but not persisted to the session to avoid storing raw blobs; transcription, tool, and usage events are persisted as in runAsync. - Adds 6 unit tests covering missing queue, missing session, blob forwarding, default modalities, audio-blob persistence skip, and function-call round-tripping.
Required for Gemini 3.1 realtime preview, which ignores
sendClientContent text turns and expects audio/video routed via
sendRealtimeInput.audio / sendRealtimeInput.video instead of media.
- sendRealtime now branches on blob.mimeType: audio/* uses
{audio: blob}, image/* uses {video: blob}, otherwise falls back
to the existing {media: blob} path.
- sendContent forwards a single user text part via
sendRealtimeInput.text so it interleaves with the audio stream.
Multi-part user content and non-user content keep using
sendClientContent.
- receive no longer breaks after turnComplete. The same call now
spans an entire conversation, surfacing turnComplete as an
in-stream signal and continuing until the websocket closes
(kind: 'close') or the consumer closes the connection.
Adds 12 unit tests in core/test/models/gemini_llm_connection_test.ts
covering sendRealtime mime routing, sendContent text/multi-part/
function-response paths, and the multi-turn receive() flow including
close and error termination.
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Author
|
@kalenkevich @Varun-S10 could you please review? |
…model-version-aware routing) Brings TS Runner.runLive in line with the Python reference (base_llm_flow.run_live): - InvocationContext.liveSessionResumptionHandle captures the most recent newHandle from sessionResumptionUpdate events. - runLiveFlow now wraps the connect/sendHistory/sendLoop/receiveLoop in an outer reconnect loop. On goAway or recoverable connection errors, reconnects with handle set in liveConnectConfig.sessionResumption and skips sendHistory because the server already holds the state. Capped at MAX_LIVE_RECONNECT_ATTEMPTS (5). - GeminiLlmConnection.receive() yields goAway events; runReceiveLoop throws LiveReconnectSignal to break out and reconnect. - sendContent and sendRealtime routing gated by isGemini31FlashLive(), matching the Python flow. Pre-3.1 models go through sendClientContent and the generic media: blob channel. - Connection now carries the model version so the gating works. - Sub-agent transfer pauses 1s before tearing down the parent connection (matches Python's DEFAULT_TRANSFER_AGENT_DELAY) and clears the parent resumption handle so the child starts a fresh live session. - runSendLoop cooperatively yields between dispatches. Tests added: handle capture, reconnect-with-handle on goAway, no-reconnect without handle, history skipped on reconnect, goAway/resumption events yielded from receive(), Gemini 3.1 vs pre-3.1 routing for sendContent and sendRealtime, sendHistory turnComplete behaviour for user/model trailing turns.
Allows callers that recreate the Runner per conversation cycle (e.g. live bridges that re-init on agent handoff) to propagate the captured resumption handle across cycles. The handle is wired through to InvocationContext at construction, so the existing runLiveFlow logic picks it up and skips client-side history replay on connect.
- LiveRequestQueue.get() accepts an optional AbortSignal; a parked read is released and its waiter removed on abort, instead of stranding a waiter that would later consume (and drop) a request. - runSendLoop reads via the abortable get(), so abort() stops it promptly rather than only between frames. - Abort the parent send loop before an agent transfer so it no longer races the sub-agent for the shared liveRequestQueue. - combineAbortSignals listeners auto-remove on teardown, so they do not accumulate on the invocation signal across reconnects. - receive(): drop unreachable toolCallParts flushes; document the postprocessLive single-field-response invariant.
Author
|
@Varun-S10 @kalenkevich I added some changes to further handle streaming. I tried to mimic Python SDK as much as it made sense (it is my understanding that all ADKs should behave in similar fashion). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Problem:
adk-jscurrently lacks real-time bidirectional audio/video streaming, whileadk-pythonexposes the equivalent feature viaRunner.run_live. The TScodebase has had
// TODO - b/425992518: Implement runLive and related methods.inrunner.tssince the initial release,BaseAgent.runLivethrows
Live mode is not implemented yet.,LlmAgent.runLiveFlowthrowsLlmAgent.runLiveFlow not implemented, andGeminiLlmConnection.receivethrows
Not Implemented.. TheRunConfiginterface already exposesresponseModalitiesandspeechConfig, so the configuration layer existed— only the
runLive/LiveRequestQueueplumbing was missing.Solution:
Implement
Runner.runLiveend-to-end with the same surface area as thePython counterpart, including the supporting
LlmAgent,BaseAgent, andGemini live connection plumbing.
Runner.runLivedrives a live invocation via aLiveRequestQueue,defaults
responseModalitiestoAUDIO, auto-enables transcriptionconfigs for multi-agent setups, and runs the plugin lifecycle
(
beforeRun/onEvent/afterRun).LlmAgent.runLiveFlowruns the same preprocessors asrunAsync, opensllm.connect, drains the live request queue on a parallel send loop,yields events from the receive loop, ferries function responses back
to the open websocket, and recurses on
transfer_to_agent.GeminiLlmConnectionbridges the GenAISessioncallbacks into anAsyncGenerator, aggregating text, transcriptions, tool calls, sessionresumption updates, and turn-complete / interruption signals.
GeminiLlmConnection.sendRealtimeroutes by mime type —audio/*→sendRealtimeInput.audio,image/*→sendRealtimeInput.video,otherwise the existing
mediapath.sendContentforwards a singleuser text part via
sendRealtimeInput.textso it interleaves with theaudio stream. Both are required for the Gemini 3.1 realtime preview,
which ignores
sendClientContenttext turns and rejects audio senton the legacy
mediachannel.GeminiLlmConnection.receiveno longer terminates onturnComplete.A single
receive()call now spans an entire conversation: itsurfaces
turnCompleteas an in-stream signal and keeps iteratinguntil the websocket closes or the consumer closes the connection.
inlineDataare yielded but skippedwhen persisting (matching Python). Transcription, tool, and usage
events are persisted as in
runAsync.Testing Plan
Unit Tests:
All 18 unit tests added by this PR pass. The 17 failing tests are
pre-existing in
core/test/sessions/database_session_service_test.tsand
core/test/sessions/db/operations_test.ts; they fail because thelocal sandbox is missing a built
sqlite3native binding and areunrelated to this change. The same suite is green on machines where
the binding is built.
core/test/runner/run_live_test.tsadds six tests:runLivethrows whenliveRequestQueueis missing.runLivethrows when the session does not exist.model events (audio, text,
turnComplete) are yielded back.responseModalitiesdefaults toAUDIOand is propagated intoliveConnectConfig.inlineDataare NOT persisted to thesession, while transcription events ARE persisted.
response is sent back to the model over the open connection.
core/test/models/gemini_llm_connection_test.tsadds twelve tests:sendRealtimeroutesaudio/*blobs throughsendRealtimeInput.audio.sendRealtimeroutesimage/*blobs throughsendRealtimeInput.video.sendRealtimefalls back tosendRealtimeInput.mediafor othermime types.
sendRealtimefalls back tomediawhen mime type is missing.sendContentsends a single user text part viasendRealtimeInput.text.sendContentusessendClientContentfor multi-part user content.sendContentusessendClientContentfor non-user roles.sendContentroutes function responses throughsendToolResponse.sendContentthrows when content has no parts.receivekeeps iterating afterturnCompleteand surfaces eventsfor subsequent turns.
receiveterminates cleanly onclose.receiverethrows on websocket error.The full integration suite also passes:
npm run lint,npm run format:check, andnpm run docs:checkare clean.Manual End-to-End (E2E) Tests:
The reproducer below mirrors the Python
bidi-demo
sample but uses the Gemini 3.1 realtime preview model. It confirms that:
LiveRequestQueuereach the live model.runLive.turnCompleteflow through.Run with
GOOGLE_GENAI_API_KEY(or Vertex equivalents) set.Reviewers can also exercise this by porting the Python
bidi-demoagentscript to TS — the public
Runner.runLivesignature is intentionallycompatible.
Checklist
Additional context
This brings TS ADK to parity with
adk-pythonfor bidi live streaming.The implementation closely follows the Python design but diverges in two
places that matter for reviewers:
GenAI SDK bridging. The Python
genaiSDK exposessession.receive()as an async iterator. The JS SDK only providescallbacks (
onmessage/onerror/onclose). The newIncomingMessageBufferingemini_llm_connection.tsadapts thecallback model into a back-pressured async generator so
BaseLlmConnection.receive()keeps its existing contract.Function response routing. Python pushes function responses back
through
LiveRequestQueueso the user-owned send loop can fan them outto active streaming tools. In TS, function responses are sent directly
via
connection.sendContentfrom the receive loop. This avoids a racein which the user-owned queue is closed but the open websocket still
needs to ferry a tool result to the model. Active streaming tool
fan-out can be added later without changing the public API.
Files touched:
core/src/runner/runner.ts— addsRunner.runLive.core/src/agents/base_agent.ts—runLive()mirrorsrunAsync(callbacks, abort, otel) and dispatches to
runLiveImpl.core/src/agents/llm_agent.ts— implementsrunLiveFlow,runLivePreprocess,runSendLoop,dispatchLiveRequest,runReceiveLoop,postprocessLive,isUserAuthoredResponse.core/src/models/base_llm_connection.ts— adds optionalsendActivityStart/sendActivityEnd.core/src/models/gemini_llm_connection.ts— fullreceive()implementation; bridges
Sessioncallbacks viaIncomingMessageBuffer;Gemini 3.1-compatible
sendRealtimemime routing; single-text-partuser content over
sendRealtimeInput.text;receiveno longerbreaks on
turnComplete.core/src/models/google_llm.ts—Gemini.connectwiresonmessage/onerror/oncloseinto the buffer.core/test/runner/run_live_test.ts— new unit tests forrunLive.core/test/models/gemini_llm_connection_test.ts— new unit tests forGeminiLlmConnection.@ScottMansfield