From 9ad964263adc1acd89116c0f2380e4ff77cb990e Mon Sep 17 00:00:00 2001 From: RoboShyim Date: Sat, 31 Jan 2026 11:10:10 +0000 Subject: [PATCH] Stream non-cacheable upstream responses --- Cargo.toml | 4 ++- src/cache.rs | 83 ++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1b7252a..c7b0ba7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] axum = { version = "0.7" } tokio = { version = "1", features = ["rt-multi-thread", "macros"] } -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "http2", "gzip", "brotli", "deflate"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "http2", "gzip", "brotli", "deflate", "stream"] } http = "1" bytes = "1" serde = { version = "1", features = ["derive"] } @@ -17,6 +17,8 @@ thiserror = "1" parking_lot = "0.12" url = "2" +futures-util = "0.3" + # IP/CIDR parsing ipnet = "2" diff --git a/src/cache.rs b/src/cache.rs index cce702c..fe1aa8d 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,6 +1,7 @@ use crate::{disk, normalize, AppState}; use axum::http::{HeaderMap, Request, Uri}; use bytes::Bytes; +use futures_util::StreamExt; use parking_lot::RwLock; use std::{collections::HashMap, sync::Arc, time::Duration}; @@ -155,14 +156,33 @@ pub async fn handle_cached( } } - // miss: fetch upstream - let (status, mut resp_headers, bytes) = - fetch_upstream_raw(&state, &parts, &norm_uri, body_bytes.clone()).await?; + // miss: fetch upstream. We first inspect headers to decide whether to buffer (cacheable) + // or stream (non-cacheable / bypass). + let upstream_url = normalize::build_upstream_url(&state.cfg.origin, &norm_uri); + + let mut fwd_headers = parts.headers.clone(); + fwd_headers.insert( + http::header::HeaderName::from_static("surrogate-capability"), + http::HeaderValue::from_static("shopware=ESI/1.0"), + ); + + let up = state + .client + .request(parts.method.clone(), upstream_url) + .headers(fwd_headers) + .body(body_bytes.clone()) + .send() + .await + .map_err(|e| format!("upstream: {e}"))?; + + let status = up.status(); + let mut resp_headers = up.headers().clone(); // Decide TTL let ttl = ttl_from_headers(&resp_headers).unwrap_or(Duration::from_secs(0)); let cacheable = ttl.as_secs() > 0 - && (parts.method == http::Method::GET || parts.method == http::Method::HEAD); + && (parts.method == http::Method::GET || parts.method == http::Method::HEAD) + && !status.is_server_error(); // VCL: sw-dynamic-cache-bypass => hit-for-miss 1s if resp_headers @@ -172,25 +192,36 @@ pub async fn handle_cached( { resp_headers.remove("sw-dynamic-cache-bypass"); - // Store a short-lived hit-for-miss marker so subsequent requests bypass cache - // and we don't repeatedly attempt to cache this URL. let hfm_ttl = Duration::from_secs(1); store_hit_for_miss(&state.cache, &cache_key, &norm_uri, hfm_ttl)?; - let mut out = build_response(status, resp_headers, bytes, &norm_uri); - out.headers_mut().insert( + let stream = up + .bytes_stream() + .map(|item: Result| item.map_err(std::io::Error::other)); + let body = axum::body::Body::from_stream(stream); + + let mut out = axum::response::Response::builder() + .status(status) + .body(body) + .unwrap(); + normalize::apply_client_cache_policy(&norm_uri, &mut resp_headers); + normalize::strip_internal_headers(&mut resp_headers); + resp_headers.insert( http::header::HeaderName::from_static("x-codycache"), http::HeaderValue::from_static("BYPASS"), ); + *out.headers_mut() = resp_headers; return Ok(out); } - let mut cache_status = if banned { "BAN" } else { "MISS" }; - if cacheable { - // Strip Set-Cookie on cacheable responses - resp_headers.remove(http::header::SET_COOKIE); + // Buffer and store + let bytes = up + .bytes() + .await + .map_err(|e| format!("upstream body: {e}"))?; + resp_headers.remove(http::header::SET_COOKIE); store( &state.cache, &cache_key, @@ -200,14 +231,32 @@ pub async fn handle_cached( &bytes, ttl, )?; - cache_status = if banned { "BAN" } else { "MISS" }; + + let mut out = build_response(status, resp_headers, bytes, &norm_uri); + out.headers_mut().insert( + http::header::HeaderName::from_static("x-codycache"), + http::HeaderValue::from_static(if banned { "BAN" } else { "MISS" }), + ); + return Ok(out); } - let mut out = build_response(status, resp_headers, bytes, &norm_uri); - out.headers_mut().insert( + // Non-cacheable: stream through + let stream = up + .bytes_stream() + .map(|item: Result| item.map_err(std::io::Error::other)); + let body = axum::body::Body::from_stream(stream); + + let mut out = axum::response::Response::builder() + .status(status) + .body(body) + .unwrap(); + normalize::apply_client_cache_policy(&norm_uri, &mut resp_headers); + normalize::strip_internal_headers(&mut resp_headers); + resp_headers.insert( http::header::HeaderName::from_static("x-codycache"), - http::HeaderValue::from_static(cache_status), + http::HeaderValue::from_static(if banned { "BAN" } else { "MISS" }), ); + *out.headers_mut() = resp_headers; Ok(out) } @@ -349,6 +398,8 @@ async fn fetch_upstream_raw( Ok((status, resp_headers, bytes)) } +// (streaming helper removed; streaming is implemented inline where needed) + fn store( cache: &Cache, key: &str,