From fe1688404e264d0bae95ec7c93af26b11c9b905f Mon Sep 17 00:00:00 2001 From: Mathieu Piton <27002047+mpiton@users.noreply.github.com> Date: Sun, 26 Apr 2026 14:16:48 +0200 Subject: [PATCH 1/6] feat(download): dynamic segment splitting on slow tail MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a parallel segment finishes before its peers, the engine now picks the slowest still-running segment whose remaining range exceeds `dynamic_split_min_remaining_mb` (default 4 MiB), shrinks it in place, and spawns a fresh worker for the upper half. Backend additions: - domain `Segment::split(at_byte)` validation method (state must be `Downloading`, split point strictly inside the unfetched range) - `DomainEvent::SegmentSplit { download_id, original_segment_id, new_segment_id, split_at }` forwarded as `segment-split` Tauri event - `AppConfig.dynamic_split_enabled` / `dynamic_split_min_remaining_mb` wired through `ConfigDto`, `ConfigPatch`, `ConfigPatchDto` - `SegmentedDownloadEngine::with_dynamic_split(enabled, min_remaining_mb)` builder consumed at startup - segment worker accepts upper bound through `tokio::sync::watch::Receiver`, re-reads it before chunk fetch and after each network read so a mid-flight shrink clamps writes to the new boundary - per-segment progress exposed via `Arc` so engine picks slowest candidate by throughput - atomic `.vortex-meta` rewrite after each split so resume after a crash mid-split sees a consistent topology (PRD §7.1, task 17) --- CHANGELOG.md | 1 + .../driven/config/toml_config_store.rs | 6 + .../src/adapters/driven/event/tauri_bridge.rs | 14 + .../driven/logging/download_log_bridge.rs | 13 + .../driven/network/download_engine.rs | 364 +++++++++++++++++- .../adapters/driven/network/segment_worker.rs | 157 ++++++-- src-tauri/src/adapters/driving/tauri_ipc.rs | 4 + src-tauri/src/domain/event.rs | 10 + src-tauri/src/domain/model/config.rs | 40 ++ src-tauri/src/domain/model/segment.rs | 86 +++++ src-tauri/src/lib.rs | 21 +- 11 files changed, 683 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9869ac1..918af5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Dynamic segment splitting (PRD-v2 P0.17, task 17): when a parallel segment finishes before its peers, the engine now re-evaluates the still-running segments, picks the slowest one whose remaining range exceeds `dynamic_split_min_remaining_mb` (default 4 MiB) and shrinks it in place — a fresh worker takes the upper half so the tail of the download accelerates instead of stalling on a single slow connection. Backend ships a domain-pure `Segment::split(at_byte)` validation method (state must be `Downloading`, split point strictly inside the unfetched range), a new `DomainEvent::SegmentSplit { download_id, original_segment_id, new_segment_id, split_at }` forwarded as the `segment-split` Tauri event and logged in the per-download log store, two new `AppConfig` / `ConfigPatch` fields `dynamic_split_enabled` (default `true`) and `dynamic_split_min_remaining_mb` (default `4`) wired through the toml config store and the Tauri IPC `ConfigPatchDto`, plus a `SegmentedDownloadEngine::with_dynamic_split(enabled, min_remaining_mb)` builder consumed at startup so the runtime engine reflects the persisted config without a restart. The segment worker accepts the upper bound through a `tokio::sync::watch::Receiver` instead of a frozen `u64`, re-reads it before each chunk fetch and again after every successful network read so a mid-flight shrink clamps the next write to the new boundary; per-segment progress is exposed via an `Arc` so the engine can pick the slowest candidate by throughput (`downloaded / elapsed`). After every split, the engine atomically rewrites `.vortex-meta` with the updated segment topology so resume after a crash mid-split sees a consistent state. (task 17) - "Report broken plugin" action (PRD-v2 P0.16, task 16): plugins listed in *Plugins → Plugin Store* now expose a *Report broken plugin* item in their kebab menu. Clicking it opens the user's default browser at a pre-filled GitHub issue on the plugin's repository, with diagnostic metadata (plugin name + version, Vortex version, OS, optional URL under test, last 50 log lines) inlined into the issue body. Backend adds a `repository_url` field to `domain::model::plugin::PluginInfo` (parsed from the new `[plugin].repository` key in `plugin.toml`), a `domain::ports::driven::UrlOpener` port plus its platform-native `SystemUrlOpener` adapter (`xdg-open` / `open` / `cmd start`, `http(s)://` only by validation), the std-only `domain::model::plugin::build_report_broken_url` URL builder (RFC 3986 unreserved-set percent encoder, last 50 log lines, GitHub-only repository hosts, accepts `.git` suffix, rejects malformed URLs with `DomainError::ValidationError`), and a `ReportBrokenPluginCommand` handler that returns `AppError::Validation` when a manifest carries no `repository_url`. New Tauri IPC `plugin_report_broken(pluginName, logLines?, testedUrl?) → string` returns the issue URL so the UI can fall back to clipboard copy if the launcher fails. i18n (en/fr): `plugins.action.reportBroken`, `plugins.toast.reportBrokenSuccess`, `plugins.toast.reportBrokenError`. (task 16) - Dynamic plugin configuration UI (PRD-v2 P0.15, task 15): plugins declaring a `[config]` block in their `plugin.toml` now expose their schema at runtime. Backend adds `ConfigField` / `ConfigFieldType` / `PluginConfigSchema` to `domain/model/plugin.rs` (typed validation, enum options, `min`/`max` bounds, regex via a std-only matcher — no external import in the domain), a `PluginConfigStore` port (`get_values` / `set_value` / `list_all` / `delete_all`) implemented by `SqlitePluginConfigRepo` backed by the new `plugin_configs (plugin_name, key, value)` table (migration `m20260425_000005_create_plugin_configs`, composite primary key). The manifest parser (`adapters/driven/plugin/manifest.rs`) now extracts `type`, `default`, `options`, `description`, `min`, `max`, `regex` on top of the existing defaults, and rejects defaults that fail their own field validation. CQRS gains `UpdatePluginConfigCommand` (validates against the schema, applies the runtime first then persists, rolls back on failure) and `GetPluginConfigQuery` (returns the schema plus persisted values, dropping any persisted entry that no longer matches the current schema and falling back to manifest defaults). `PluginLoader` is extended with `get_manifest()` and `set_runtime_config()`; `ExtismPluginLoader` implements both by reading from `PluginRegistry` and writing to `SharedHostResources::plugin_configs`, so `get_config(key)` calls from the WASM plugin observe the new value without a reload. At startup, `lib.rs` replays persisted configs onto the in-memory map before plugins are loaded. Frontend adds two components: `PluginConfigField.tsx` (dispatcher renderer: `string` → text input, `boolean` → shadcn switch, `integer`/`float` → numeric input with bounds, `url` → url input, `enum` (and `string` with options) → shadcn select; `aria-describedby` on the control points to the error message) and `PluginConfigDialog.tsx` (loads the schema via `useQuery`, validates each field on the UI side (rejects empty floats, validates JSON arrays) before sending, persists changed values sequentially, guards the schema-reset effect while a save is in flight to avoid clobbering the draft, invalidates the query on success). `PluginsView` queries `plugin_config_get` for each installed plugin (keyed off the unfiltered installed list to avoid churn while typing in search) to decide whether the *Configure* button (Settings icon, next to the *More* menu) should render: a plugin without `[config]` exposes no button. New IPC commands `plugin_config_get(name) → PluginConfigView` and `plugin_config_update(name, key, value)`. i18n (en/fr): `plugins.action.configure`, `plugins.config.{title,description,loading,error,noFields,toast.{saveSuccess,validationFailed}}`. (task 15) - History retention with automatic daily purge (PRD-v2 P0.14, task 14): new `history_retention_days` setting (default 30, presets 7 / 30 / 90 / 365 / `0 = unlimited`) exposed in the *General* Settings tab as a `Select` dropdown wired to `settings_update`. Backend ships a `Clock` domain port (`SystemClock` adapter under `adapters/driven/scheduler/`) and a `HistoryPurgeWorker` daemon spawned during Tauri setup that hard-deletes `history` rows where `completed_at < now - retention_days * 86_400`. The worker persists its last run as a Unix-epoch timestamp inside `/.history_purge_state` (sentinel filename `HISTORY_PURGE_STATE_FILE`). On startup, the daemon reads the sentinel and either runs immediately (missing/stale) or sleeps for `SECS_PER_DAY - elapsed` so the first post-launch purge stays anchored to the previous successful run instead of drifting up to ~47h after a restart; the recurring loop then ticks every 24h via `tokio::time::interval` with `MissedTickBehavior::Skip`. `retention_days <= 0` is a no-op that does not write the sentinel, so the next run re-fires the moment the user re-enables retention; corrupt sentinels are treated as "never ran" so a stuck file never blocks the scheduler. The worker shares the same `Arc` and `Arc` the IPC layer already mutates, so a settings change is observed without restart. Domain helper `normalize_history_retention_days` clamps negatives back to `0` and is now applied at every write boundary — `apply_patch` (so a crafted `settings_update` payload cannot persist a negative) and `From for AppConfig` (so a hand-edited `config.toml` is normalized at load) — plus the worker itself for defense-in-depth. (task 14) diff --git a/src-tauri/src/adapters/driven/config/toml_config_store.rs b/src-tauri/src/adapters/driven/config/toml_config_store.rs index fcea4bf..4489f05 100644 --- a/src-tauri/src/adapters/driven/config/toml_config_store.rs +++ b/src-tauri/src/adapters/driven/config/toml_config_store.rs @@ -156,6 +156,8 @@ struct ConfigDto { retry_delay_seconds: u32, verify_checksums: bool, pre_allocate_space: bool, + dynamic_split_enabled: bool, + dynamic_split_min_remaining_mb: u64, // History history_retention_days: i64, @@ -211,6 +213,8 @@ impl From for ConfigDto { retry_delay_seconds: c.retry_delay_seconds, verify_checksums: c.verify_checksums, pre_allocate_space: c.pre_allocate_space, + dynamic_split_enabled: c.dynamic_split_enabled, + dynamic_split_min_remaining_mb: c.dynamic_split_min_remaining_mb, history_retention_days: c.history_retention_days, proxy_type: c.proxy_type, proxy_url: c.proxy_url, @@ -251,6 +255,8 @@ impl From for AppConfig { retry_delay_seconds: d.retry_delay_seconds, verify_checksums: d.verify_checksums, pre_allocate_space: d.pre_allocate_space, + dynamic_split_enabled: d.dynamic_split_enabled, + dynamic_split_min_remaining_mb: d.dynamic_split_min_remaining_mb, history_retention_days: normalize_history_retention_days(d.history_retention_days), proxy_type: d.proxy_type, proxy_url: d.proxy_url, diff --git a/src-tauri/src/adapters/driven/event/tauri_bridge.rs b/src-tauri/src/adapters/driven/event/tauri_bridge.rs index 41dfdf4..9ade8ec 100644 --- a/src-tauri/src/adapters/driven/event/tauri_bridge.rs +++ b/src-tauri/src/adapters/driven/event/tauri_bridge.rs @@ -50,6 +50,7 @@ fn event_name(event: &DomainEvent) -> &'static str { DomainEvent::SegmentStarted { .. } => "segment-started", DomainEvent::SegmentCompleted { .. } => "segment-completed", DomainEvent::SegmentFailed { .. } => "segment-failed", + DomainEvent::SegmentSplit { .. } => "segment-split", DomainEvent::PluginLoaded { .. } => "plugin-loaded", DomainEvent::PluginUnloaded { .. } => "plugin-unloaded", DomainEvent::PackageCreated { .. } => "package-created", @@ -116,6 +117,19 @@ fn event_payload(event: &DomainEvent) -> serde_json::Value { } => { json!({ "downloadId": download_id.0, "segmentId": segment_id, "error": error }) } + DomainEvent::SegmentSplit { + download_id, + original_segment_id, + new_segment_id, + split_at, + } => { + json!({ + "downloadId": download_id.0, + "originalSegmentId": original_segment_id, + "newSegmentId": new_segment_id, + "splitAt": split_at, + }) + } DomainEvent::PluginLoaded { name, version } => { json!({ "name": name, "version": version }) diff --git a/src-tauri/src/adapters/driven/logging/download_log_bridge.rs b/src-tauri/src/adapters/driven/logging/download_log_bridge.rs index b0115de..7393983 100644 --- a/src-tauri/src/adapters/driven/logging/download_log_bridge.rs +++ b/src-tauri/src/adapters/driven/logging/download_log_bridge.rs @@ -84,6 +84,19 @@ fn record_download_event(store: &DownloadLogStore, event: &DomainEvent) { format!("[ERROR] Segment {segment_id} failed: {error}"), ); } + DomainEvent::SegmentSplit { + download_id, + original_segment_id, + new_segment_id, + split_at, + } => { + store.push( + download_id.0, + format!( + "[INFO] Segment {original_segment_id} split at byte {split_at}; new segment {new_segment_id} took the upper half" + ), + ); + } DomainEvent::ChecksumVerified { id, algorithm, .. } => { store.push(id.0, format!("[INFO] {algorithm} checksum verified")); } diff --git a/src-tauri/src/adapters/driven/network/download_engine.rs b/src-tauri/src/adapters/driven/network/download_engine.rs index 43e5999..68bd3e2 100644 --- a/src-tauri/src/adapters/driven/network/download_engine.rs +++ b/src-tauri/src/adapters/driven/network/download_engine.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::atomic::AtomicU64; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::watch; @@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken; use crate::domain::error::DomainError; use crate::domain::event::DomainEvent; use crate::domain::model::download::{Download, DownloadId}; +use crate::domain::model::meta::{DownloadMeta, SegmentMeta}; use crate::domain::ports::driven::{DownloadEngine, EventBus, FileStorage}; use super::format_error_chain; @@ -20,12 +21,124 @@ struct ActiveDownload { pause_sender: watch::Sender, } +/// Runtime state of one in-flight segment, tracked by the engine so it can +/// shrink the segment's range and observe its throughput for dynamic split. +struct SegmentRuntimeState { + end_tx: watch::Sender, + progress: Arc, + started_at: std::time::Instant, + start_byte: u64, + initial_end: u64, +} + +/// Pick the slowest active segment whose remaining range is large enough +/// to benefit from a split. Returns the slot index and the byte at which +/// to split (midpoint of the remaining range). +fn pick_split_target( + segments: &[Option], + min_remaining_bytes: u64, +) -> Option<(usize, u64)> { + let mut slowest: Option<(usize, f64, u64)> = None; + for (idx, slot) in segments.iter().enumerate() { + let Some(state) = slot else { continue }; + if state.initial_end == u64::MAX { + continue; // unbounded segments cannot be split + } + let downloaded = state.progress.load(Ordering::Relaxed); + let current_offset = state.start_byte.saturating_add(downloaded); + if current_offset >= state.initial_end { + continue; // already at end — completion event will fire shortly + } + let remaining = state.initial_end - current_offset; + if remaining < min_remaining_bytes.max(1) { + continue; + } + let split_at = current_offset.saturating_add(remaining / 2); + if split_at <= current_offset || split_at >= state.initial_end { + continue; + } + let elapsed = state.started_at.elapsed().as_secs_f64().max(1e-3); + let bps = downloaded as f64 / elapsed; + match slowest { + None => slowest = Some((idx, bps, split_at)), + Some((_, prev_bps, _)) if bps < prev_bps => { + slowest = Some((idx, bps, split_at)); + } + _ => {} + } + } + slowest.map(|(idx, _, split_at)| (idx, split_at)) +} + +/// Atomically rewrite `.vortex-meta` after a dynamic split so resume after a +/// crash sees the updated segment topology. A failure here only logs — the +/// in-memory split is still valid for the live download. +async fn persist_split_meta( + file_storage: &Arc, + dest_path: &Path, + download_id: DownloadId, + url: &str, + total_size: u64, + active_segments: &[Option], +) { + let segments_meta: Vec = active_segments + .iter() + .enumerate() + .filter_map(|(i, slot)| { + slot.as_ref().map(|st| SegmentMeta { + id: i as u32, + start_byte: st.start_byte, + end_byte: st.initial_end, + downloaded_bytes: st.progress.load(Ordering::Relaxed), + completed: false, + }) + }) + .collect(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let file_name = dest_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("") + .to_string(); + let snapshot = DownloadMeta { + download_id, + url: url.to_string(), + file_name, + total_bytes: Some(total_size), + segments: segments_meta, + checksum_expected: None, + created_at: now, + updated_at: now, + }; + let storage = file_storage.clone(); + let path = dest_path.to_path_buf(); + let join = tokio::task::spawn_blocking(move || storage.write_meta(&path, &snapshot)).await; + match join { + Ok(Ok(())) => {} + Ok(Err(e)) => tracing::warn!( + download_id = download_id.0, + error = %e, + "persist meta after split failed (download still proceeds)" + ), + Err(e) => tracing::warn!( + download_id = download_id.0, + error = %e, + "persist meta after split task panicked" + ), + } +} + pub struct SegmentedDownloadEngine { client: reqwest::Client, file_storage: Arc, event_bus: Arc, default_segments: u32, min_segment_bytes: u64, + dynamic_split_enabled: bool, + dynamic_split_min_remaining_bytes: u64, active_downloads: Arc>>, } @@ -42,6 +155,8 @@ impl SegmentedDownloadEngine { event_bus, default_segments: default_segments.max(1), min_segment_bytes: 64 * 1024, + dynamic_split_enabled: true, + dynamic_split_min_remaining_bytes: 4 * 1024 * 1024, active_downloads: Arc::new(Mutex::new(HashMap::new())), } } @@ -51,6 +166,15 @@ impl SegmentedDownloadEngine { self } + /// Configure runtime re-splitting of slow segments. PRD §7.1. + /// `min_remaining_mb == 0` disables the size gate entirely; the engine + /// then only refuses to split if the candidate has 0 bytes left. + pub fn with_dynamic_split(mut self, enabled: bool, min_remaining_mb: u64) -> Self { + self.dynamic_split_enabled = enabled; + self.dynamic_split_min_remaining_bytes = min_remaining_mb.saturating_mul(1024 * 1024); + self + } + async fn probe_remote_metadata( client: &reqwest::Client, url: &str, @@ -133,6 +257,8 @@ impl DownloadEngine for SegmentedDownloadEngine { let event_bus = self.event_bus.clone(); let active_downloads = self.active_downloads.clone(); let min_segment_bytes = self.min_segment_bytes; + let dynamic_split_enabled = self.dynamic_split_enabled; + let dynamic_split_min_remaining = self.dynamic_split_min_remaining_bytes; tokio::spawn(async move { let (total_size, supports_range) = @@ -274,7 +400,17 @@ impl DownloadEngine for SegmentedDownloadEngine { let shared_downloaded = Arc::new(AtomicU64::new(0)); let mut join_set = JoinSet::new(); + let mut segment_state: Vec = Vec::with_capacity(segments.len()); for (index, (start, end)) in segments.iter().enumerate() { + let (end_tx, end_rx) = watch::channel(*end); + let progress = Arc::new(AtomicU64::new(0)); + segment_state.push(SegmentRuntimeState { + end_tx, + progress: progress.clone(), + started_at: std::time::Instant::now(), + start_byte: *start, + initial_end: *end, + }); join_set.spawn(download_segment(SegmentParams { client: client.clone(), file_storage: file_storage.clone(), @@ -283,22 +419,95 @@ impl DownloadEngine for SegmentedDownloadEngine { segment_index: index as u32, url: url.clone(), start_byte: *start, - end_byte: *end, + end_byte_rx: end_rx, already_downloaded: 0, total_file_size: total_size, dest_path: dest_path.clone(), pause_rx: pause_rx.clone(), cancel_token: cancel_token.clone(), shared_downloaded: shared_downloaded.clone(), + segment_progress: progress, })); } let mut failed = false; let mut error_msg = String::new(); + let mut next_segment_id: u32 = segments.len() as u32; + let mut active_segments: Vec> = + segment_state.into_iter().map(Some).collect(); while let Some(result) = join_set.join_next().await { match result { - Ok(Ok(_bytes)) => {} + Ok(Ok(_bytes)) => { + if dynamic_split_enabled + && !cancel_token.is_cancelled() + && let Some((idx, split_at)) = + pick_split_target(&active_segments, dynamic_split_min_remaining) + { + let new_id = next_segment_id; + next_segment_id += 1; + // Capture state needed before we touch it again. + let (initial_end, signal_sent) = { + let old_state = active_segments[idx] + .as_ref() + .expect("slot present at split time"); + let initial_end = old_state.initial_end; + let signal_sent = old_state.end_tx.send(split_at).is_ok(); + (initial_end, signal_sent) + }; + if !signal_sent { + tracing::warn!( + download_id = download_id.0, + original_segment_id = idx as u32, + "split skipped: target worker no longer listening" + ); + continue; + } + event_bus.publish(DomainEvent::SegmentSplit { + download_id, + original_segment_id: idx as u32, + new_segment_id: new_id, + split_at, + }); + + let new_progress = Arc::new(AtomicU64::new(0)); + let (new_end_tx, new_end_rx) = watch::channel(initial_end); + join_set.spawn(download_segment(SegmentParams { + client: client.clone(), + file_storage: file_storage.clone(), + event_bus: event_bus.clone(), + download_id, + segment_index: new_id, + url: url.clone(), + start_byte: split_at, + end_byte_rx: new_end_rx, + already_downloaded: 0, + total_file_size: total_size, + dest_path: dest_path.clone(), + pause_rx: pause_rx.clone(), + cancel_token: cancel_token.clone(), + shared_downloaded: shared_downloaded.clone(), + segment_progress: new_progress.clone(), + })); + active_segments.push(Some(SegmentRuntimeState { + end_tx: new_end_tx, + progress: new_progress, + started_at: std::time::Instant::now(), + start_byte: split_at, + initial_end, + })); + + persist_split_meta( + &file_storage, + &dest_path, + download_id, + &url, + total_size, + &active_segments, + ) + .await; + } + } Ok(Err(e)) => match e { SegmentError::Cancelled => { cancel_token.cancel(); @@ -761,6 +970,153 @@ mod tests { ); } + #[tokio::test] + async fn test_dynamic_split_skipped_when_remaining_too_small() { + // 2 KiB total, 4 segments, min_remaining 4 MiB → split must NOT trigger. + let server = MockServer::start().await; + let body = vec![b'a'; 2048]; + + Mock::given(method("HEAD")) + .and(path("/small")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-length", "2048") + .insert_header("accept-ranges", "bytes"), + ) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("/small")) + .respond_with(ResponseTemplate::new(206).set_body_bytes(body)) + .mount(&server) + .await; + + let storage = Arc::new(MockFileStorage::new()); + let bus = Arc::new(CollectingEventBus::new()); + let engine = SegmentedDownloadEngine::new(reqwest::Client::new(), storage, bus.clone(), 4) + .with_min_segment_bytes(256) + .with_dynamic_split(true, 4); // 4 MiB threshold blocks 2 KiB file + + let url = format!("{}/small", server.uri()); + let download = make_download(70, &url); + engine.start(&download).unwrap(); + + let found = bus + .wait_for_event_async( + |e| matches!(e, DomainEvent::DownloadCompleted { id } if id.0 == 70), + Duration::from_secs(5), + ) + .await; + assert!(found, "download did not complete"); + + let events = bus.collected(); + assert!( + !events + .iter() + .any(|e| matches!(e, DomainEvent::SegmentSplit { .. })), + "no split should fire when remaining < threshold; got {events:?}" + ); + } + + #[tokio::test] + async fn test_dynamic_split_disabled_via_config_does_not_split() { + let server = MockServer::start().await; + let body = vec![b'x'; 64 * 1024]; + + Mock::given(method("HEAD")) + .and(path("/disabled")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-length", "65536") + .insert_header("accept-ranges", "bytes"), + ) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path("/disabled")) + .respond_with(ResponseTemplate::new(206).set_body_bytes(body)) + .mount(&server) + .await; + + let storage = Arc::new(MockFileStorage::new()); + let bus = Arc::new(CollectingEventBus::new()); + let engine = SegmentedDownloadEngine::new(reqwest::Client::new(), storage, bus.clone(), 4) + .with_min_segment_bytes(1024) + .with_dynamic_split(false, 0); + + let url = format!("{}/disabled", server.uri()); + let download = make_download(71, &url); + engine.start(&download).unwrap(); + + let found = bus + .wait_for_event_async( + |e| matches!(e, DomainEvent::DownloadCompleted { id } if id.0 == 71), + Duration::from_secs(5), + ) + .await; + assert!(found); + let events = bus.collected(); + assert!( + !events + .iter() + .any(|e| matches!(e, DomainEvent::SegmentSplit { .. })), + "split must not fire when disabled" + ); + } + + #[test] + fn test_pick_split_target_prefers_slowest_above_threshold() { + let make = |start: u64, end: u64, downloaded: u64, age_ms: u64| { + Some(SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(age_ms), + start_byte: start, + initial_end: end, + }) + }; + let segs = [ + // fast: 1 MiB downloaded in 100 ms → 10 MiB/s + make(0, 16 * 1024 * 1024, 1024 * 1024, 100), + // slow: 100 KiB in 1000 ms → ~100 KiB/s, plenty of remaining + make(16 * 1024 * 1024, 32 * 1024 * 1024, 100 * 1024, 1000), + // tiny remaining → must be filtered + make(32 * 1024 * 1024, 32 * 1024 * 1024 + 1024, 512, 200), + ]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert_eq!( + pick.map(|(i, _)| i), + Some(1), + "expected slot 1 (slowest with enough remaining), got {pick:?}" + ); + let (_, split_at) = pick.unwrap(); + assert!( + split_at > 16 * 1024 * 1024 + 100 * 1024, + "split must be above current offset" + ); + assert!( + split_at < 32 * 1024 * 1024, + "split must be below initial_end" + ); + } + + #[test] + fn test_pick_split_target_returns_none_when_all_below_threshold() { + let make = |start: u64, end: u64, downloaded: u64| { + Some(SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now(), + start_byte: start, + initial_end: end, + }) + }; + let segs = [make(0, 1024, 100), make(1024, 2048, 0), make(2048, 3072, 0)]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert!(pick.is_none(), "got {pick:?}"); + } + #[tokio::test] async fn test_pause_unknown_id_returns_not_found() { let storage = Arc::new(MockFileStorage::new()); diff --git a/src-tauri/src/adapters/driven/network/segment_worker.rs b/src-tauri/src/adapters/driven/network/segment_worker.rs index 621a663..77a66c2 100644 --- a/src-tauri/src/adapters/driven/network/segment_worker.rs +++ b/src-tauri/src/adapters/driven/network/segment_worker.rs @@ -21,6 +21,14 @@ pub(crate) enum SegmentError { PauseChannelClosed, } +/// Read the watched end_byte. Returns `None` for the unbounded sentinel +/// (`u64::MAX`, used when the server didn't advertise a length and we +/// cannot send a Range header). +fn bounded(end_rx: &watch::Receiver) -> Option { + let v = *end_rx.borrow(); + if v == u64::MAX { None } else { Some(v) } +} + /// Parameters for a single segment download. pub(crate) struct SegmentParams { pub client: reqwest::Client, @@ -30,8 +38,10 @@ pub(crate) struct SegmentParams { pub segment_index: u32, pub url: String, pub start_byte: u64, - /// Exclusive upper bound of this segment's byte range. - pub end_byte: u64, + /// Watchable exclusive upper bound. May be reduced mid-flight by the + /// engine to support PRD §7.1 dynamic splitting. `u64::MAX` means + /// "unbounded — no Range header" and must not be reduced after start. + pub end_byte_rx: watch::Receiver, pub already_downloaded: u64, /// Total size of the entire file (used in progress events). pub total_file_size: u64, @@ -40,6 +50,9 @@ pub(crate) struct SegmentParams { pub cancel_token: CancellationToken, /// Shared atomic counter for aggregate progress across all segments. pub shared_downloaded: Arc, + /// Per-segment downloaded counter, observable by the engine to estimate + /// throughput when picking a split target. + pub segment_progress: Arc, } /// Downloads a single byte range and writes it to disk. @@ -55,24 +68,26 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result= end_byte { + if effective_start >= initial_end { event_bus.publish(DomainEvent::SegmentCompleted { download_id, segment_id: segment_index, @@ -82,8 +97,8 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result Result= current_end + { + break; + } + // Check pause state — if paused, wait with cancellation support if *pause_rx.borrow() { loop { @@ -191,14 +214,21 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result end_byte { - let allowed = end_byte.saturating_sub(offset) as usize; - data.truncate(allowed); - chunk_len = allowed as u64; - if chunk_len == 0 { + // Re-read end_byte AFTER chunk fetch so an engine-driven mid-flight + // shrink that landed during the network read is honored before we + // write past the new boundary. + if let Some(live_end) = bounded(&end_byte_rx) { + if offset >= live_end { break; } + if offset + chunk_len > live_end { + let allowed = live_end.saturating_sub(offset) as usize; + data.truncate(allowed); + chunk_len = allowed as u64; + if chunk_len == 0 { + break; + } + } } tokio::task::spawn_blocking(move || storage.write_segment(&path, offset, &data)) @@ -216,6 +246,7 @@ pub(crate) async fn download_segment(params: SegmentParams) -> Result Result, pub verify_checksums: Option, pub pre_allocate_space: Option, + pub dynamic_split_enabled: Option, + pub dynamic_split_min_remaining_mb: Option, // History pub history_retention_days: Option, @@ -955,6 +957,8 @@ impl From for ConfigPatch { retry_delay_seconds: d.retry_delay_seconds, verify_checksums: d.verify_checksums, pre_allocate_space: d.pre_allocate_space, + dynamic_split_enabled: d.dynamic_split_enabled, + dynamic_split_min_remaining_mb: d.dynamic_split_min_remaining_mb, history_retention_days: d.history_retention_days, proxy_type: d.proxy_type, proxy_url: d.proxy_url, diff --git a/src-tauri/src/domain/event.rs b/src-tauri/src/domain/event.rs index aa1f430..08814db 100644 --- a/src-tauri/src/domain/event.rs +++ b/src-tauri/src/domain/event.rs @@ -88,6 +88,16 @@ pub enum DomainEvent { segment_id: u32, error: String, }, + /// A still-running segment was split in two by the dynamic-split + /// scheduler so the remaining range can be parallelised. The original + /// segment now ends at `split_at`; a fresh segment with `new_segment_id` + /// covers `[split_at, original_end)`. + SegmentSplit { + download_id: DownloadId, + original_segment_id: u32, + new_segment_id: u32, + split_at: u64, + }, // Plugins PluginLoaded { diff --git a/src-tauri/src/domain/model/config.rs b/src-tauri/src/domain/model/config.rs index 4fecd6d..053055b 100644 --- a/src-tauri/src/domain/model/config.rs +++ b/src-tauri/src/domain/model/config.rs @@ -30,6 +30,13 @@ pub struct AppConfig { pub retry_delay_seconds: u32, pub verify_checksums: bool, pub pre_allocate_space: bool, + /// Enable runtime re-split of slow segments when a faster segment + /// finishes. PRD §7.1 (répartition dynamique). + pub dynamic_split_enabled: bool, + /// Minimum remaining bytes (in MiB) required before a segment is + /// eligible for re-split. Below this threshold, the parallelism gain + /// is dwarfed by HTTP request and rebalance overhead. + pub dynamic_split_min_remaining_mb: u64, // ── History ────────────────────────────────────────────────────── /// Number of days history entries are retained before automatic @@ -96,6 +103,8 @@ impl Default for AppConfig { retry_delay_seconds: 10, verify_checksums: true, pre_allocate_space: true, + dynamic_split_enabled: true, + dynamic_split_min_remaining_mb: 4, // History history_retention_days: 30, @@ -156,6 +165,8 @@ pub struct ConfigPatch { pub retry_delay_seconds: Option, pub verify_checksums: Option, pub pre_allocate_space: Option, + pub dynamic_split_enabled: Option, + pub dynamic_split_min_remaining_mb: Option, // History pub history_retention_days: Option, @@ -267,6 +278,12 @@ pub fn apply_patch(config: &mut AppConfig, patch: &ConfigPatch) { if let Some(v) = patch.pre_allocate_space { config.pre_allocate_space = v; } + if let Some(v) = patch.dynamic_split_enabled { + config.dynamic_split_enabled = v; + } + if let Some(v) = patch.dynamic_split_min_remaining_mb { + config.dynamic_split_min_remaining_mb = v; + } // History if let Some(v) = patch.history_retention_days { @@ -371,6 +388,29 @@ mod tests { assert!(config.api_key.is_empty()); } + #[test] + fn test_default_dynamic_split_enabled_and_min_remaining() { + let c = AppConfig::default(); + assert!( + c.dynamic_split_enabled, + "PRD §7.1: dynamic split on by default" + ); + assert_eq!(c.dynamic_split_min_remaining_mb, 4); + } + + #[test] + fn test_apply_patch_updates_dynamic_split_fields() { + let mut config = AppConfig::default(); + let patch = ConfigPatch { + dynamic_split_enabled: Some(false), + dynamic_split_min_remaining_mb: Some(16), + ..Default::default() + }; + apply_patch(&mut config, &patch); + assert!(!config.dynamic_split_enabled); + assert_eq!(config.dynamic_split_min_remaining_mb, 16); + } + #[test] fn test_normalize_max_concurrent_clamps_zero_to_min() { assert_eq!( diff --git a/src-tauri/src/domain/model/segment.rs b/src-tauri/src/domain/model/segment.rs index 14684b4..b6b07b8 100644 --- a/src-tauri/src/domain/model/segment.rs +++ b/src-tauri/src/domain/model/segment.rs @@ -129,6 +129,43 @@ impl Segment { self.downloaded_bytes = bytes; } + /// Split a downloading segment in two: shrink self to `[start, at_byte)` + /// and return a new pending segment covering `[at_byte, original_end)`. + /// + /// Used by the runtime engine to re-balance a slow segment when a faster + /// peer finishes (PRD §7.1 dynamic split). + pub fn split(&mut self, at_byte: u64) -> Result { + if self.state != SegmentState::Downloading { + return Err(DomainError::ValidationError(format!( + "cannot split segment in state {:?}", + self.state + ))); + } + let current_offset = self.start_byte + self.downloaded_bytes; + if at_byte <= current_offset { + return Err(DomainError::ValidationError(format!( + "split point {at_byte} must be strictly above current offset {current_offset}" + ))); + } + if at_byte >= self.end_byte { + return Err(DomainError::ValidationError(format!( + "split point {at_byte} must be strictly below end_byte {}", + self.end_byte + ))); + } + let upper = Segment { + id: self.id.wrapping_add(1_000_000), + download_id: self.download_id, + start_byte: at_byte, + end_byte: self.end_byte, + downloaded_bytes: 0, + state: SegmentState::Pending, + retry_count: 0, + }; + self.end_byte = at_byte; + Ok(upper) + } + // --- Getters --- pub fn id(&self) -> u32 { @@ -301,6 +338,55 @@ mod tests { ); } + #[test] + fn test_segment_split_returns_upper_half_and_shrinks_self() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + s.start().unwrap(); + s.update_progress(200); + let upper = s.split(600).unwrap(); + // self keeps lower half + assert_eq!(s.start_byte(), 0); + assert_eq!(s.end_byte(), 600); + assert_eq!(s.downloaded_bytes(), 200); + assert_eq!(s.state(), SegmentState::Downloading); + // upper covers [600, 1000), pending + assert_eq!(upper.start_byte(), 600); + assert_eq!(upper.end_byte(), 1000); + assert_eq!(upper.downloaded_bytes(), 0); + assert_eq!(upper.state(), SegmentState::Pending); + assert_eq!(upper.download_id(), DownloadId(10)); + assert_ne!(upper.id(), s.id()); + } + + #[test] + fn test_segment_split_rejects_at_or_below_current_offset() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + s.start().unwrap(); + s.update_progress(200); + assert!(matches!(s.split(200), Err(DomainError::ValidationError(_)))); + assert!(matches!(s.split(100), Err(DomainError::ValidationError(_)))); + } + + #[test] + fn test_segment_split_rejects_at_or_above_end_byte() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + s.start().unwrap(); + assert!(matches!( + s.split(1000), + Err(DomainError::ValidationError(_)) + )); + assert!(matches!( + s.split(1500), + Err(DomainError::ValidationError(_)) + )); + } + + #[test] + fn test_segment_split_rejects_when_not_downloading() { + let mut s = Segment::new(1, DownloadId(10), 0, 1000); + assert!(matches!(s.split(500), Err(DomainError::ValidationError(_)))); + } + #[test] fn test_segment_validates_byte_range() { // equal bytes is valid (zero-length segment) diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index f5a80c8..8f2562e 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -211,12 +211,21 @@ pub fn run() { let plugin_loader: Arc = plugin_loader_impl.clone(); // ── Download engine ───────────────────────────────────── - let download_engine: Arc = Arc::new(SegmentedDownloadEngine::new( - reqwest_client, - file_storage.clone(), - event_bus.clone(), - 4, - )); + let initial_engine_config = config_store + .get_config() + .unwrap_or_else(|_| crate::domain::model::config::AppConfig::default()); + let download_engine: Arc = Arc::new( + SegmentedDownloadEngine::new( + reqwest_client, + file_storage.clone(), + event_bus.clone(), + 4, + ) + .with_dynamic_split( + initial_engine_config.dynamic_split_enabled, + initial_engine_config.dynamic_split_min_remaining_mb, + ), + ); // ── Startup recovery ──────────────────────────────────── // Orphaned downloads (Downloading/Waiting/Checking/Extracting From e8902bb478be2d28ffdd2873382d3a750cb93e20 Mon Sep 17 00:00:00 2001 From: Mathieu Piton <27002047+mpiton@users.noreply.github.com> Date: Sun, 26 Apr 2026 15:21:39 +0200 Subject: [PATCH 2/6] fix(download): address PR #111 review comments - update active_segments[idx].initial_end after successful end_tx.send so a subsequent pick_split_target cannot expand a shrunk worker past its new boundary, and persist_split_meta writes the post-split end instead of the stale one (greptile P1, coderabbit P1, cubic-dev-ai P1) - Segment::split now requires a caller-provided new_id, rejects collisions with self.id, and stops inventing IDs via wrapping_add to keep allocation in lockstep with the engine's monotonic counter (greptile P2, coderabbit minor, cubic-dev-ai P1+P2) - each spawned segment task now returns (slot_idx, Result); on completion the engine clears active_segments[slot_idx] to None so pick_split_target and persist_split_meta only see live segments (greptile P2) - SettingsDto now exposes dynamic_split_enabled and dynamic_split_min_remaining_mb so the frontend can read back the persisted values, not just write them (coderabbit major, cubic-dev-ai P2) - SegmentedDownloadEngine stores the dynamic-split knobs in atomic fields and exposes set_dynamic_split; new application::services::engine_config_bridge subscribes to SettingsUpdated and forwards live changes so settings updates reconfigure already-running engines without restart (cubic-dev-ai P2) CHANGELOG updated. Frontend AppConfig type and 7 test fixtures updated for the new fields. cargo test --workspace 930/930, clippy clean, fmt clean. vitest 582/582, oxlint clean, tsc clean. --- CHANGELOG.md | 2 +- .../driven/network/download_engine.rs | 105 +++++++---- src-tauri/src/adapters/driving/tauri_ipc.rs | 4 + .../services/engine_config_bridge.rs | 175 ++++++++++++++++++ src-tauri/src/application/services/mod.rs | 2 + src-tauri/src/domain/model/segment.rs | 48 ++++- src-tauri/src/lib.rs | 10 +- .../__tests__/ClipboardIndicator.test.tsx | 2 + src/hooks/__tests__/useAppEffects.test.ts | 2 + src/layouts/__tests__/AppLayout.test.tsx | 2 + src/stores/__tests__/settingsStore.test.ts | 2 + src/types/settings.ts | 2 + .../__tests__/LinkGrabberView.test.tsx | 2 + .../SettingsView/__tests__/Sections.test.tsx | 2 + .../__tests__/SettingsView.test.tsx | 2 + 15 files changed, 315 insertions(+), 47 deletions(-) create mode 100644 src-tauri/src/application/services/engine_config_bridge.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 918af5b..c31357e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Dynamic segment splitting (PRD-v2 P0.17, task 17): when a parallel segment finishes before its peers, the engine now re-evaluates the still-running segments, picks the slowest one whose remaining range exceeds `dynamic_split_min_remaining_mb` (default 4 MiB) and shrinks it in place — a fresh worker takes the upper half so the tail of the download accelerates instead of stalling on a single slow connection. Backend ships a domain-pure `Segment::split(at_byte)` validation method (state must be `Downloading`, split point strictly inside the unfetched range), a new `DomainEvent::SegmentSplit { download_id, original_segment_id, new_segment_id, split_at }` forwarded as the `segment-split` Tauri event and logged in the per-download log store, two new `AppConfig` / `ConfigPatch` fields `dynamic_split_enabled` (default `true`) and `dynamic_split_min_remaining_mb` (default `4`) wired through the toml config store and the Tauri IPC `ConfigPatchDto`, plus a `SegmentedDownloadEngine::with_dynamic_split(enabled, min_remaining_mb)` builder consumed at startup so the runtime engine reflects the persisted config without a restart. The segment worker accepts the upper bound through a `tokio::sync::watch::Receiver` instead of a frozen `u64`, re-reads it before each chunk fetch and again after every successful network read so a mid-flight shrink clamps the next write to the new boundary; per-segment progress is exposed via an `Arc` so the engine can pick the slowest candidate by throughput (`downloaded / elapsed`). After every split, the engine atomically rewrites `.vortex-meta` with the updated segment topology so resume after a crash mid-split sees a consistent state. (task 17) +- Dynamic segment splitting (PRD-v2 P0.17, task 17): when a parallel segment finishes before its peers, the engine now re-evaluates the still-running segments, picks the slowest one whose remaining range exceeds `dynamic_split_min_remaining_mb` (default 4 MiB) and shrinks it in place — a fresh worker takes the upper half so the tail of the download accelerates instead of stalling on a single slow connection. Backend ships a domain-pure `Segment::split(at_byte, new_id)` validation method (state must be `Downloading`, split point strictly inside the unfetched range, caller-provided id must differ from the original — IDs are allocated by the engine's monotonic `next_segment_id` counter, never invented inside the domain), a new `DomainEvent::SegmentSplit { download_id, original_segment_id, new_segment_id, split_at }` forwarded as the `segment-split` Tauri event and logged in the per-download log store, two new `AppConfig` / `ConfigPatch` / `SettingsDto` fields `dynamic_split_enabled` (default `true`) and `dynamic_split_min_remaining_mb` (default `4`) wired through the toml config store, the Tauri IPC `SettingsDto`/`ConfigPatchDto` (so the frontend can both read and write them) and the new `application::services::engine_config_bridge` subscriber so live `settings_update` calls reconfigure already-running engines without a restart. `SegmentedDownloadEngine` stores `dynamic_split_enabled` / `dynamic_split_min_remaining_bytes` in `Arc` / `Arc` and exposes a `set_dynamic_split(enabled, min_remaining_mb)` setter consumed by the bridge. After a split, the engine updates the original slot's `initial_end` to `split_at` immediately on successful `end_tx.send`, so a subsequent `pick_split_target` evaluation cannot expand the worker's range past the shrunk boundary and `persist_split_meta` records the post-split topology rather than the stale one (closes coderabbit P1 + greptile P1 race). Each segment task now returns `(slot_idx, Result)`; on completion the engine clears the slot to `None`, so completed segments are excluded from split candidate iteration and metadata snapshots. The segment worker accepts the upper bound through a `tokio::sync::watch::Receiver` instead of a frozen `u64`, re-reads it before each chunk fetch and again after every successful network read so a mid-flight shrink clamps the next write to the new boundary; per-segment progress is exposed via an `Arc` so the engine can pick the slowest candidate by throughput (`downloaded / elapsed`). After every split, the engine atomically rewrites `.vortex-meta` with the updated segment topology so resume after a crash mid-split sees a consistent state. (task 17, PR #111 review) - "Report broken plugin" action (PRD-v2 P0.16, task 16): plugins listed in *Plugins → Plugin Store* now expose a *Report broken plugin* item in their kebab menu. Clicking it opens the user's default browser at a pre-filled GitHub issue on the plugin's repository, with diagnostic metadata (plugin name + version, Vortex version, OS, optional URL under test, last 50 log lines) inlined into the issue body. Backend adds a `repository_url` field to `domain::model::plugin::PluginInfo` (parsed from the new `[plugin].repository` key in `plugin.toml`), a `domain::ports::driven::UrlOpener` port plus its platform-native `SystemUrlOpener` adapter (`xdg-open` / `open` / `cmd start`, `http(s)://` only by validation), the std-only `domain::model::plugin::build_report_broken_url` URL builder (RFC 3986 unreserved-set percent encoder, last 50 log lines, GitHub-only repository hosts, accepts `.git` suffix, rejects malformed URLs with `DomainError::ValidationError`), and a `ReportBrokenPluginCommand` handler that returns `AppError::Validation` when a manifest carries no `repository_url`. New Tauri IPC `plugin_report_broken(pluginName, logLines?, testedUrl?) → string` returns the issue URL so the UI can fall back to clipboard copy if the launcher fails. i18n (en/fr): `plugins.action.reportBroken`, `plugins.toast.reportBrokenSuccess`, `plugins.toast.reportBrokenError`. (task 16) - Dynamic plugin configuration UI (PRD-v2 P0.15, task 15): plugins declaring a `[config]` block in their `plugin.toml` now expose their schema at runtime. Backend adds `ConfigField` / `ConfigFieldType` / `PluginConfigSchema` to `domain/model/plugin.rs` (typed validation, enum options, `min`/`max` bounds, regex via a std-only matcher — no external import in the domain), a `PluginConfigStore` port (`get_values` / `set_value` / `list_all` / `delete_all`) implemented by `SqlitePluginConfigRepo` backed by the new `plugin_configs (plugin_name, key, value)` table (migration `m20260425_000005_create_plugin_configs`, composite primary key). The manifest parser (`adapters/driven/plugin/manifest.rs`) now extracts `type`, `default`, `options`, `description`, `min`, `max`, `regex` on top of the existing defaults, and rejects defaults that fail their own field validation. CQRS gains `UpdatePluginConfigCommand` (validates against the schema, applies the runtime first then persists, rolls back on failure) and `GetPluginConfigQuery` (returns the schema plus persisted values, dropping any persisted entry that no longer matches the current schema and falling back to manifest defaults). `PluginLoader` is extended with `get_manifest()` and `set_runtime_config()`; `ExtismPluginLoader` implements both by reading from `PluginRegistry` and writing to `SharedHostResources::plugin_configs`, so `get_config(key)` calls from the WASM plugin observe the new value without a reload. At startup, `lib.rs` replays persisted configs onto the in-memory map before plugins are loaded. Frontend adds two components: `PluginConfigField.tsx` (dispatcher renderer: `string` → text input, `boolean` → shadcn switch, `integer`/`float` → numeric input with bounds, `url` → url input, `enum` (and `string` with options) → shadcn select; `aria-describedby` on the control points to the error message) and `PluginConfigDialog.tsx` (loads the schema via `useQuery`, validates each field on the UI side (rejects empty floats, validates JSON arrays) before sending, persists changed values sequentially, guards the schema-reset effect while a save is in flight to avoid clobbering the draft, invalidates the query on success). `PluginsView` queries `plugin_config_get` for each installed plugin (keyed off the unfiltered installed list to avoid churn while typing in search) to decide whether the *Configure* button (Settings icon, next to the *More* menu) should render: a plugin without `[config]` exposes no button. New IPC commands `plugin_config_get(name) → PluginConfigView` and `plugin_config_update(name, key, value)`. i18n (en/fr): `plugins.action.configure`, `plugins.config.{title,description,loading,error,noFields,toast.{saveSuccess,validationFailed}}`. (task 15) - History retention with automatic daily purge (PRD-v2 P0.14, task 14): new `history_retention_days` setting (default 30, presets 7 / 30 / 90 / 365 / `0 = unlimited`) exposed in the *General* Settings tab as a `Select` dropdown wired to `settings_update`. Backend ships a `Clock` domain port (`SystemClock` adapter under `adapters/driven/scheduler/`) and a `HistoryPurgeWorker` daemon spawned during Tauri setup that hard-deletes `history` rows where `completed_at < now - retention_days * 86_400`. The worker persists its last run as a Unix-epoch timestamp inside `/.history_purge_state` (sentinel filename `HISTORY_PURGE_STATE_FILE`). On startup, the daemon reads the sentinel and either runs immediately (missing/stale) or sleeps for `SECS_PER_DAY - elapsed` so the first post-launch purge stays anchored to the previous successful run instead of drifting up to ~47h after a restart; the recurring loop then ticks every 24h via `tokio::time::interval` with `MissedTickBehavior::Skip`. `retention_days <= 0` is a no-op that does not write the sentinel, so the next run re-fires the moment the user re-enables retention; corrupt sentinels are treated as "never ran" so a stuck file never blocks the scheduler. The worker shares the same `Arc` and `Arc` the IPC layer already mutates, so a settings change is observed without restart. Domain helper `normalize_history_retention_days` clamps negatives back to `0` and is now applied at every write boundary — `apply_patch` (so a crafted `settings_update` payload cannot persist a negative) and `From for AppConfig` (so a hand-edited `config.toml` is normalized at load) — plus the worker itself for defense-in-depth. (task 14) diff --git a/src-tauri/src/adapters/driven/network/download_engine.rs b/src-tauri/src/adapters/driven/network/download_engine.rs index 68bd3e2..0e7e55d 100644 --- a/src-tauri/src/adapters/driven/network/download_engine.rs +++ b/src-tauri/src/adapters/driven/network/download_engine.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::watch; @@ -137,8 +137,8 @@ pub struct SegmentedDownloadEngine { event_bus: Arc, default_segments: u32, min_segment_bytes: u64, - dynamic_split_enabled: bool, - dynamic_split_min_remaining_bytes: u64, + dynamic_split_enabled: Arc, + dynamic_split_min_remaining_bytes: Arc, active_downloads: Arc>>, } @@ -155,8 +155,8 @@ impl SegmentedDownloadEngine { event_bus, default_segments: default_segments.max(1), min_segment_bytes: 64 * 1024, - dynamic_split_enabled: true, - dynamic_split_min_remaining_bytes: 4 * 1024 * 1024, + dynamic_split_enabled: Arc::new(AtomicBool::new(true)), + dynamic_split_min_remaining_bytes: Arc::new(AtomicU64::new(4 * 1024 * 1024)), active_downloads: Arc::new(Mutex::new(HashMap::new())), } } @@ -169,12 +169,22 @@ impl SegmentedDownloadEngine { /// Configure runtime re-splitting of slow segments. PRD §7.1. /// `min_remaining_mb == 0` disables the size gate entirely; the engine /// then only refuses to split if the candidate has 0 bytes left. - pub fn with_dynamic_split(mut self, enabled: bool, min_remaining_mb: u64) -> Self { - self.dynamic_split_enabled = enabled; - self.dynamic_split_min_remaining_bytes = min_remaining_mb.saturating_mul(1024 * 1024); + pub fn with_dynamic_split(self, enabled: bool, min_remaining_mb: u64) -> Self { + self.set_dynamic_split(enabled, min_remaining_mb); self } + /// Update dynamic-split runtime parameters live. Used by the engine + /// config bridge so settings changes from the UI take effect on + /// already-running and newly-started downloads without restart. + pub fn set_dynamic_split(&self, enabled: bool, min_remaining_mb: u64) { + self.dynamic_split_enabled.store(enabled, Ordering::Relaxed); + self.dynamic_split_min_remaining_bytes.store( + min_remaining_mb.saturating_mul(1024 * 1024), + Ordering::Relaxed, + ); + } + async fn probe_remote_metadata( client: &reqwest::Client, url: &str, @@ -257,8 +267,8 @@ impl DownloadEngine for SegmentedDownloadEngine { let event_bus = self.event_bus.clone(); let active_downloads = self.active_downloads.clone(); let min_segment_bytes = self.min_segment_bytes; - let dynamic_split_enabled = self.dynamic_split_enabled; - let dynamic_split_min_remaining = self.dynamic_split_min_remaining_bytes; + let dynamic_split_enabled = self.dynamic_split_enabled.clone(); + let dynamic_split_min_remaining_bytes = self.dynamic_split_min_remaining_bytes.clone(); tokio::spawn(async move { let (total_size, supports_range) = @@ -399,7 +409,7 @@ impl DownloadEngine for SegmentedDownloadEngine { event_bus.publish(DomainEvent::DownloadStarted { id: download_id }); let shared_downloaded = Arc::new(AtomicU64::new(0)); - let mut join_set = JoinSet::new(); + let mut join_set: JoinSet<(usize, Result)> = JoinSet::new(); let mut segment_state: Vec = Vec::with_capacity(segments.len()); for (index, (start, end)) in segments.iter().enumerate() { let (end_tx, end_rx) = watch::channel(*end); @@ -411,7 +421,7 @@ impl DownloadEngine for SegmentedDownloadEngine { start_byte: *start, initial_end: *end, }); - join_set.spawn(download_segment(SegmentParams { + let params = SegmentParams { client: client.clone(), file_storage: file_storage.clone(), event_bus: event_bus.clone(), @@ -427,7 +437,9 @@ impl DownloadEngine for SegmentedDownloadEngine { cancel_token: cancel_token.clone(), shared_downloaded: shared_downloaded.clone(), segment_progress: progress, - })); + }; + let slot_idx = index; + join_set.spawn(async move { (slot_idx, download_segment(params).await) }); } let mut failed = false; @@ -438,21 +450,35 @@ impl DownloadEngine for SegmentedDownloadEngine { while let Some(result) = join_set.join_next().await { match result { - Ok(Ok(_bytes)) => { - if dynamic_split_enabled + Ok((slot_idx, Ok(_bytes))) => { + // Clear the completed slot so pick_split_target ignores it + // and persist_split_meta reflects the live topology. + if slot_idx < active_segments.len() { + active_segments[slot_idx] = None; + } + + if dynamic_split_enabled.load(Ordering::Relaxed) && !cancel_token.is_cancelled() - && let Some((idx, split_at)) = - pick_split_target(&active_segments, dynamic_split_min_remaining) + && let Some((idx, split_at)) = pick_split_target( + &active_segments, + dynamic_split_min_remaining_bytes.load(Ordering::Relaxed), + ) { let new_id = next_segment_id; next_segment_id += 1; - // Capture state needed before we touch it again. + // Capture state and update initial_end on success so a + // subsequent pick_split_target on the same slot — or a + // crash recovery via persist_split_meta — observes the + // shrunk range, not the pre-split end. let (initial_end, signal_sent) = { let old_state = active_segments[idx] - .as_ref() + .as_mut() .expect("slot present at split time"); let initial_end = old_state.initial_end; let signal_sent = old_state.end_tx.send(split_at).is_ok(); + if signal_sent { + old_state.initial_end = split_at; + } (initial_end, signal_sent) }; if !signal_sent { @@ -472,7 +498,8 @@ impl DownloadEngine for SegmentedDownloadEngine { let new_progress = Arc::new(AtomicU64::new(0)); let (new_end_tx, new_end_rx) = watch::channel(initial_end); - join_set.spawn(download_segment(SegmentParams { + let new_slot_idx = active_segments.len(); + let params = SegmentParams { client: client.clone(), file_storage: file_storage.clone(), event_bus: event_bus.clone(), @@ -488,7 +515,10 @@ impl DownloadEngine for SegmentedDownloadEngine { cancel_token: cancel_token.clone(), shared_downloaded: shared_downloaded.clone(), segment_progress: new_progress.clone(), - })); + }; + join_set.spawn(async move { + (new_slot_idx, download_segment(params).await) + }); active_segments.push(Some(SegmentRuntimeState { end_tx: new_end_tx, progress: new_progress, @@ -508,23 +538,28 @@ impl DownloadEngine for SegmentedDownloadEngine { .await; } } - Ok(Err(e)) => match e { - SegmentError::Cancelled => { - cancel_token.cancel(); + Ok((slot_idx, Err(e))) => { + if slot_idx < active_segments.len() { + active_segments[slot_idx] = None; } - _ => { - if failed { - tracing::warn!( - download_id = download_id.0, - previous_error = %error_msg, - "additional segment failure (overwriting previous error)" - ); + match e { + SegmentError::Cancelled => { + cancel_token.cancel(); + } + _ => { + if failed { + tracing::warn!( + download_id = download_id.0, + previous_error = %error_msg, + "additional segment failure (overwriting previous error)" + ); + } + error_msg = format!("{e:?}"); + failed = true; + cancel_token.cancel(); } - error_msg = format!("{e:?}"); - failed = true; - cancel_token.cancel(); } - }, + } Err(e) => { error_msg = format!("segment task panicked: {e}"); failed = true; diff --git a/src-tauri/src/adapters/driving/tauri_ipc.rs b/src-tauri/src/adapters/driving/tauri_ipc.rs index 5fee858..063eb8e 100644 --- a/src-tauri/src/adapters/driving/tauri_ipc.rs +++ b/src-tauri/src/adapters/driving/tauri_ipc.rs @@ -810,6 +810,8 @@ pub struct SettingsDto { pub retry_delay_seconds: u32, pub verify_checksums: bool, pub pre_allocate_space: bool, + pub dynamic_split_enabled: bool, + pub dynamic_split_min_remaining_mb: u64, // History pub history_retention_days: i64, @@ -864,6 +866,8 @@ impl From for SettingsDto { retry_delay_seconds: c.retry_delay_seconds, verify_checksums: c.verify_checksums, pre_allocate_space: c.pre_allocate_space, + dynamic_split_enabled: c.dynamic_split_enabled, + dynamic_split_min_remaining_mb: c.dynamic_split_min_remaining_mb, history_retention_days: c.history_retention_days, proxy_type: c.proxy_type, proxy_url: c.proxy_url, diff --git a/src-tauri/src/application/services/engine_config_bridge.rs b/src-tauri/src/application/services/engine_config_bridge.rs new file mode 100644 index 0000000..c872b8c --- /dev/null +++ b/src-tauri/src/application/services/engine_config_bridge.rs @@ -0,0 +1,175 @@ +//! Bridges `SettingsUpdated` events to live engine knobs. +//! +//! The download engine caches `dynamic_split_*` parameters in atomic +//! fields so settings changes from the UI take effect on already-running +//! and newly-started downloads without restart. Mirrors the pattern used +//! by [`super::queue_config_bridge`] for `max_concurrent_downloads`. + +use std::sync::Arc; + +use crate::adapters::driven::network::SegmentedDownloadEngine; +use crate::domain::event::DomainEvent; +use crate::domain::ports::driven::{ConfigStore, EventBus}; + +/// Subscribe the engine to configuration updates. +/// +/// On every [`DomainEvent::SettingsUpdated`], reads the current +/// `dynamic_split_*` values and forwards them to +/// [`SegmentedDownloadEngine::set_dynamic_split`]. Read errors are +/// logged and swallowed so one bad read does not poison the +/// subscription. +pub fn subscribe_engine_to_config( + event_bus: &dyn EventBus, + config_store: Arc, + engine: Arc, +) { + event_bus.subscribe(Box::new(move |event| { + if !matches!(event, DomainEvent::SettingsUpdated) { + return; + } + match config_store.get_config() { + Ok(config) => { + engine.set_dynamic_split( + config.dynamic_split_enabled, + config.dynamic_split_min_remaining_mb, + ); + } + Err(err) => { + tracing::error!(%err, "engine_config_bridge: failed to read config"); + } + } + })); +} + +#[cfg(test)] +mod tests { + use std::path::Path; + use std::sync::Mutex; + + use super::*; + use crate::domain::error::DomainError; + use crate::domain::model::config::{AppConfig, ConfigPatch, apply_patch}; + use crate::domain::model::download::DownloadId; + use crate::domain::model::meta::DownloadMeta; + use crate::domain::ports::driven::FileStorage; + + struct StubConfigStore { + config: Mutex, + } + + impl ConfigStore for StubConfigStore { + fn get_config(&self) -> Result { + Ok(self.config.lock().unwrap().clone()) + } + + fn update_config(&self, patch: ConfigPatch) -> Result { + let mut cfg = self.config.lock().unwrap(); + apply_patch(&mut cfg, &patch); + Ok(cfg.clone()) + } + } + + type Handler = Box; + + struct SyncEventBus { + handlers: Mutex>, + } + + impl SyncEventBus { + fn new() -> Self { + Self { + handlers: Mutex::new(Vec::new()), + } + } + } + + impl EventBus for SyncEventBus { + fn publish(&self, event: DomainEvent) { + let handlers = self.handlers.lock().unwrap(); + for handler in handlers.iter() { + handler(&event); + } + } + + fn subscribe(&self, handler: Handler) { + self.handlers.lock().unwrap().push(handler); + } + } + + struct NoopStorage; + impl FileStorage for NoopStorage { + fn create_file(&self, _path: &Path, _size: u64) -> Result<(), DomainError> { + Ok(()) + } + fn write_segment( + &self, + _path: &Path, + _offset: u64, + _data: &[u8], + ) -> Result<(), DomainError> { + Ok(()) + } + fn read_meta(&self, _path: &Path) -> Result, DomainError> { + Ok(None) + } + fn write_meta(&self, _path: &Path, _meta: &DownloadMeta) -> Result<(), DomainError> { + Ok(()) + } + fn delete_meta(&self, _path: &Path) -> Result<(), DomainError> { + Ok(()) + } + } + + fn make_engine() -> Arc { + Arc::new(SegmentedDownloadEngine::new( + reqwest::Client::new(), + Arc::new(NoopStorage), + Arc::new(SyncEventBus::new()), + 4, + )) + } + + #[tokio::test] + async fn test_settings_updated_propagates_dynamic_split_changes() { + let cfg = AppConfig { + dynamic_split_enabled: false, + dynamic_split_min_remaining_mb: 16, + ..AppConfig::default() + }; + let config_store: Arc = Arc::new(StubConfigStore { + config: Mutex::new(cfg), + }); + let bus = SyncEventBus::new(); + let engine = make_engine(); + + // Engine starts with builder defaults (enabled=true, 4 MiB). + engine.set_dynamic_split(true, 4); + subscribe_engine_to_config(&bus, Arc::clone(&config_store), Arc::clone(&engine)); + + bus.publish(DomainEvent::SettingsUpdated); + // Bridge picked up the persisted (false, 16 MiB) — observable via a + // follow-up patch + publish that flips both values. + config_store + .update_config(ConfigPatch { + dynamic_split_enabled: Some(true), + dynamic_split_min_remaining_mb: Some(8), + ..Default::default() + }) + .unwrap(); + bus.publish(DomainEvent::SettingsUpdated); + } + + #[tokio::test] + async fn test_non_settings_events_are_ignored() { + let cfg = AppConfig::default(); + let config_store: Arc = Arc::new(StubConfigStore { + config: Mutex::new(cfg), + }); + let bus = SyncEventBus::new(); + let engine = make_engine(); + subscribe_engine_to_config(&bus, Arc::clone(&config_store), Arc::clone(&engine)); + + // Non-Settings events must not panic and must be a no-op. + bus.publish(DomainEvent::DownloadStarted { id: DownloadId(1) }); + } +} diff --git a/src-tauri/src/application/services/mod.rs b/src-tauri/src/application/services/mod.rs index 061537b..a3ef2d7 100644 --- a/src-tauri/src/application/services/mod.rs +++ b/src-tauri/src/application/services/mod.rs @@ -1,8 +1,10 @@ pub mod checksum_validator; +pub mod engine_config_bridge; pub mod queue_config_bridge; pub mod queue_manager; pub mod startup_recovery; pub use checksum_validator::{ChecksumOutcome, ChecksumValidatorService}; +pub use engine_config_bridge::subscribe_engine_to_config; pub use queue_config_bridge::subscribe_queue_to_config; pub use queue_manager::QueueManager; diff --git a/src-tauri/src/domain/model/segment.rs b/src-tauri/src/domain/model/segment.rs index b6b07b8..fb38e84 100644 --- a/src-tauri/src/domain/model/segment.rs +++ b/src-tauri/src/domain/model/segment.rs @@ -132,15 +132,24 @@ impl Segment { /// Split a downloading segment in two: shrink self to `[start, at_byte)` /// and return a new pending segment covering `[at_byte, original_end)`. /// + /// `new_id` is supplied by the caller because segment IDs are allocated + /// by the engine's monotonic counter — the domain method must not invent + /// IDs that could collide with engine-assigned ones. + /// /// Used by the runtime engine to re-balance a slow segment when a faster /// peer finishes (PRD §7.1 dynamic split). - pub fn split(&mut self, at_byte: u64) -> Result { + pub fn split(&mut self, at_byte: u64, new_id: u32) -> Result { if self.state != SegmentState::Downloading { return Err(DomainError::ValidationError(format!( "cannot split segment in state {:?}", self.state ))); } + if new_id == self.id { + return Err(DomainError::ValidationError(format!( + "split id {new_id} collides with original segment id" + ))); + } let current_offset = self.start_byte + self.downloaded_bytes; if at_byte <= current_offset { return Err(DomainError::ValidationError(format!( @@ -154,7 +163,7 @@ impl Segment { ))); } let upper = Segment { - id: self.id.wrapping_add(1_000_000), + id: new_id, download_id: self.download_id, start_byte: at_byte, end_byte: self.end_byte, @@ -343,19 +352,19 @@ mod tests { let mut s = Segment::new(1, DownloadId(10), 0, 1000); s.start().unwrap(); s.update_progress(200); - let upper = s.split(600).unwrap(); + let upper = s.split(600, 42).unwrap(); // self keeps lower half assert_eq!(s.start_byte(), 0); assert_eq!(s.end_byte(), 600); assert_eq!(s.downloaded_bytes(), 200); assert_eq!(s.state(), SegmentState::Downloading); - // upper covers [600, 1000), pending + // upper covers [600, 1000), pending, with caller-provided id assert_eq!(upper.start_byte(), 600); assert_eq!(upper.end_byte(), 1000); assert_eq!(upper.downloaded_bytes(), 0); assert_eq!(upper.state(), SegmentState::Pending); assert_eq!(upper.download_id(), DownloadId(10)); - assert_ne!(upper.id(), s.id()); + assert_eq!(upper.id(), 42); } #[test] @@ -363,8 +372,14 @@ mod tests { let mut s = Segment::new(1, DownloadId(10), 0, 1000); s.start().unwrap(); s.update_progress(200); - assert!(matches!(s.split(200), Err(DomainError::ValidationError(_)))); - assert!(matches!(s.split(100), Err(DomainError::ValidationError(_)))); + assert!(matches!( + s.split(200, 42), + Err(DomainError::ValidationError(_)) + )); + assert!(matches!( + s.split(100, 42), + Err(DomainError::ValidationError(_)) + )); } #[test] @@ -372,11 +387,11 @@ mod tests { let mut s = Segment::new(1, DownloadId(10), 0, 1000); s.start().unwrap(); assert!(matches!( - s.split(1000), + s.split(1000, 42), Err(DomainError::ValidationError(_)) )); assert!(matches!( - s.split(1500), + s.split(1500, 42), Err(DomainError::ValidationError(_)) )); } @@ -384,7 +399,20 @@ mod tests { #[test] fn test_segment_split_rejects_when_not_downloading() { let mut s = Segment::new(1, DownloadId(10), 0, 1000); - assert!(matches!(s.split(500), Err(DomainError::ValidationError(_)))); + assert!(matches!( + s.split(500, 42), + Err(DomainError::ValidationError(_)) + )); + } + + #[test] + fn test_segment_split_rejects_id_collision_with_self() { + let mut s = Segment::new(7, DownloadId(10), 0, 1000); + s.start().unwrap(); + assert!(matches!( + s.split(500, 7), + Err(DomainError::ValidationError(_)) + )); } #[test] diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 8f2562e..7c74fcb 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -214,7 +214,7 @@ pub fn run() { let initial_engine_config = config_store .get_config() .unwrap_or_else(|_| crate::domain::model::config::AppConfig::default()); - let download_engine: Arc = Arc::new( + let segmented_engine = Arc::new( SegmentedDownloadEngine::new( reqwest_client, file_storage.clone(), @@ -226,6 +226,14 @@ pub fn run() { initial_engine_config.dynamic_split_min_remaining_mb, ), ); + // Keep settings → engine bridge alive so UI changes to + // dynamic_split_* propagate without a restart. + application::services::subscribe_engine_to_config( + event_bus.as_ref(), + config_store.clone(), + segmented_engine.clone(), + ); + let download_engine: Arc = segmented_engine; // ── Startup recovery ──────────────────────────────────── // Orphaned downloads (Downloading/Waiting/Checking/Extracting diff --git a/src/components/__tests__/ClipboardIndicator.test.tsx b/src/components/__tests__/ClipboardIndicator.test.tsx index 67eec5e..a296032 100644 --- a/src/components/__tests__/ClipboardIndicator.test.tsx +++ b/src/components/__tests__/ClipboardIndicator.test.tsx @@ -30,6 +30,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0", diff --git a/src/hooks/__tests__/useAppEffects.test.ts b/src/hooks/__tests__/useAppEffects.test.ts index 67198ae..bb2965a 100644 --- a/src/hooks/__tests__/useAppEffects.test.ts +++ b/src/hooks/__tests__/useAppEffects.test.ts @@ -27,6 +27,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: 'none', proxyUrl: null, userAgent: 'Vortex/1.0', diff --git a/src/layouts/__tests__/AppLayout.test.tsx b/src/layouts/__tests__/AppLayout.test.tsx index 4877248..d7055b8 100644 --- a/src/layouts/__tests__/AppLayout.test.tsx +++ b/src/layouts/__tests__/AppLayout.test.tsx @@ -30,6 +30,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0", diff --git a/src/stores/__tests__/settingsStore.test.ts b/src/stores/__tests__/settingsStore.test.ts index 56ca684..882423f 100644 --- a/src/stores/__tests__/settingsStore.test.ts +++ b/src/stores/__tests__/settingsStore.test.ts @@ -28,6 +28,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: 'none', proxyUrl: null, userAgent: 'Vortex/1.0', diff --git a/src/types/settings.ts b/src/types/settings.ts index ae74e76..9c6e790 100644 --- a/src/types/settings.ts +++ b/src/types/settings.ts @@ -28,6 +28,8 @@ export interface AppConfig { retryDelaySeconds: number; verifyChecksums: boolean; preAllocateSpace: boolean; + dynamicSplitEnabled: boolean; + dynamicSplitMinRemainingMb: number; // History historyRetentionDays: number; diff --git a/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx b/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx index 6247e39..57202e5 100644 --- a/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx +++ b/src/views/LinkGrabberView/__tests__/LinkGrabberView.test.tsx @@ -28,6 +28,8 @@ const baseConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: false, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0", diff --git a/src/views/SettingsView/__tests__/Sections.test.tsx b/src/views/SettingsView/__tests__/Sections.test.tsx index dcc612e..c73577b 100644 --- a/src/views/SettingsView/__tests__/Sections.test.tsx +++ b/src/views/SettingsView/__tests__/Sections.test.tsx @@ -44,6 +44,8 @@ const mockConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: true, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, historyRetentionDays: 30, proxyType: "none", proxyUrl: null, diff --git a/src/views/SettingsView/__tests__/SettingsView.test.tsx b/src/views/SettingsView/__tests__/SettingsView.test.tsx index d98b17f..23d8144 100644 --- a/src/views/SettingsView/__tests__/SettingsView.test.tsx +++ b/src/views/SettingsView/__tests__/SettingsView.test.tsx @@ -33,6 +33,8 @@ const mockConfig: AppConfig = { retryDelaySeconds: 10, verifyChecksums: true, preAllocateSpace: true, + dynamicSplitEnabled: true, + dynamicSplitMinRemainingMb: 4, proxyType: "none", proxyUrl: null, userAgent: "Vortex/1.0", From 5670466f9cf1090a3d0611c803608c40aa7f767a Mon Sep 17 00:00:00 2001 From: Mathieu Piton <27002047+mpiton@users.noreply.github.com> Date: Sun, 26 Apr 2026 15:44:27 +0200 Subject: [PATCH 3/6] fix(download): assert engine state in config bridge tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous test pair published events but never checked the engine — a broken bridge would have passed. Adds a `dynamic_split_state` accessor on `SegmentedDownloadEngine` and uses it to assert: - the engine flips from the seeded (true, 4 MiB) to the persisted (false, 16 MiB) when SettingsUpdated fires - a subsequent patch + publish flips again to (true, 8 MiB) - non-Settings events leave the engine state untouched Closes the cubic-dev-ai P3 + coderabbit nitpick on PR #111. --- .../driven/network/download_engine.rs | 11 +++++++ .../services/engine_config_bridge.rs | 29 +++++++++++++++---- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src-tauri/src/adapters/driven/network/download_engine.rs b/src-tauri/src/adapters/driven/network/download_engine.rs index 0e7e55d..91e45fb 100644 --- a/src-tauri/src/adapters/driven/network/download_engine.rs +++ b/src-tauri/src/adapters/driven/network/download_engine.rs @@ -185,6 +185,17 @@ impl SegmentedDownloadEngine { ); } + /// Read back the current dynamic-split parameters as `(enabled, min_remaining_bytes)`. + /// Lets the bridge tests prove that a `SettingsUpdated` event actually + /// reaches the engine; also useful for diagnostics on a running download. + pub fn dynamic_split_state(&self) -> (bool, u64) { + ( + self.dynamic_split_enabled.load(Ordering::Relaxed), + self.dynamic_split_min_remaining_bytes + .load(Ordering::Relaxed), + ) + } + async fn probe_remote_metadata( client: &reqwest::Client, url: &str, diff --git a/src-tauri/src/application/services/engine_config_bridge.rs b/src-tauri/src/application/services/engine_config_bridge.rs index c872b8c..5472a4f 100644 --- a/src-tauri/src/application/services/engine_config_bridge.rs +++ b/src-tauri/src/application/services/engine_config_bridge.rs @@ -129,6 +129,8 @@ mod tests { )) } + const MIB: u64 = 1024 * 1024; + #[tokio::test] async fn test_settings_updated_propagates_dynamic_split_changes() { let cfg = AppConfig { @@ -142,13 +144,18 @@ mod tests { let bus = SyncEventBus::new(); let engine = make_engine(); - // Engine starts with builder defaults (enabled=true, 4 MiB). + // Seed the engine with values that differ from the persisted config so + // a successful bridge call has something to flip. engine.set_dynamic_split(true, 4); + assert_eq!(engine.dynamic_split_state(), (true, 4 * MIB)); + subscribe_engine_to_config(&bus, Arc::clone(&config_store), Arc::clone(&engine)); + // Publishing must propagate the persisted (false, 16 MiB) into the engine. bus.publish(DomainEvent::SettingsUpdated); - // Bridge picked up the persisted (false, 16 MiB) — observable via a - // follow-up patch + publish that flips both values. + assert_eq!(engine.dynamic_split_state(), (false, 16 * MIB)); + + // A subsequent patch + publish must flip both knobs again. config_store .update_config(ConfigPatch { dynamic_split_enabled: Some(true), @@ -157,19 +164,31 @@ mod tests { }) .unwrap(); bus.publish(DomainEvent::SettingsUpdated); + assert_eq!(engine.dynamic_split_state(), (true, 8 * MIB)); } #[tokio::test] async fn test_non_settings_events_are_ignored() { - let cfg = AppConfig::default(); + // Persisted config differs from the engine state so a stray bridge + // call would be observable. + let cfg = AppConfig { + dynamic_split_enabled: false, + dynamic_split_min_remaining_mb: 32, + ..AppConfig::default() + }; let config_store: Arc = Arc::new(StubConfigStore { config: Mutex::new(cfg), }); let bus = SyncEventBus::new(); let engine = make_engine(); + engine.set_dynamic_split(true, 4); + let before = engine.dynamic_split_state(); + assert_eq!(before, (true, 4 * MIB)); + subscribe_engine_to_config(&bus, Arc::clone(&config_store), Arc::clone(&engine)); - // Non-Settings events must not panic and must be a no-op. + // Non-Settings events must NOT touch the engine. bus.publish(DomainEvent::DownloadStarted { id: DownloadId(1) }); + assert_eq!(engine.dynamic_split_state(), before); } } From 34a6e61ba1ad2cb6c789e535b7f12058d76b95a9 Mon Sep 17 00:00:00 2001 From: Mathieu Piton <27002047+mpiton@users.noreply.github.com> Date: Sun, 26 Apr 2026 15:58:21 +0200 Subject: [PATCH 4/6] fix(download): preserve completed segments in split meta + sample gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two fixes for issues coderabbit flagged on PR #111: 1) Persisted split snapshots dropped completed byte ranges (Major). The previous fix nulled a slot in `active_segments` on completion, then called `persist_split_meta` from the same path. Because the serializer filtered Option::Some only, every meta written after a split omitted the segment that had just finished. A crash between the split and the next progress write would resume without any record that those bytes were already on disk. `active_segments` is now `Vec` (no Option) and each entry carries a `completed: bool`. The coordinator sets the flag on `Ok(_bytes)` instead of removing the slot. `pick_split_target` skips completed slots so they cannot be re-picked. `persist_split_meta` serializes every slot, reporting `completed: true` and a full-range `downloaded_bytes` for finished segments so resume sees both live and completed ranges. 2) Fresh split children could be re-split before any throughput sample (Major). `pick_split_target` previously computed `bps = downloaded / elapsed` even for slots with `downloaded == 0` and `elapsed ≈ 0`, treating them as 0 B/s. A brand-new child therefore became the guaranteed "slowest" candidate on the next completion event, cascading splits into the newest range without any real slow-tail signal. Picker now skips a slot until it has produced a sample: `downloaded > 0` AND `elapsed >= MIN_SPLIT_SAMPLE_DURATION` (500 ms). Two new unit tests cover both rejection paths and a third asserts that completed slots are excluded from selection. cargo test --workspace 932/932, clippy clean, fmt clean. --- CHANGELOG.md | 2 +- .../driven/network/download_engine.rs | 184 ++++++++++++------ 2 files changed, 129 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c31357e..a8ab2fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Dynamic segment splitting (PRD-v2 P0.17, task 17): when a parallel segment finishes before its peers, the engine now re-evaluates the still-running segments, picks the slowest one whose remaining range exceeds `dynamic_split_min_remaining_mb` (default 4 MiB) and shrinks it in place — a fresh worker takes the upper half so the tail of the download accelerates instead of stalling on a single slow connection. Backend ships a domain-pure `Segment::split(at_byte, new_id)` validation method (state must be `Downloading`, split point strictly inside the unfetched range, caller-provided id must differ from the original — IDs are allocated by the engine's monotonic `next_segment_id` counter, never invented inside the domain), a new `DomainEvent::SegmentSplit { download_id, original_segment_id, new_segment_id, split_at }` forwarded as the `segment-split` Tauri event and logged in the per-download log store, two new `AppConfig` / `ConfigPatch` / `SettingsDto` fields `dynamic_split_enabled` (default `true`) and `dynamic_split_min_remaining_mb` (default `4`) wired through the toml config store, the Tauri IPC `SettingsDto`/`ConfigPatchDto` (so the frontend can both read and write them) and the new `application::services::engine_config_bridge` subscriber so live `settings_update` calls reconfigure already-running engines without a restart. `SegmentedDownloadEngine` stores `dynamic_split_enabled` / `dynamic_split_min_remaining_bytes` in `Arc` / `Arc` and exposes a `set_dynamic_split(enabled, min_remaining_mb)` setter consumed by the bridge. After a split, the engine updates the original slot's `initial_end` to `split_at` immediately on successful `end_tx.send`, so a subsequent `pick_split_target` evaluation cannot expand the worker's range past the shrunk boundary and `persist_split_meta` records the post-split topology rather than the stale one (closes coderabbit P1 + greptile P1 race). Each segment task now returns `(slot_idx, Result)`; on completion the engine clears the slot to `None`, so completed segments are excluded from split candidate iteration and metadata snapshots. The segment worker accepts the upper bound through a `tokio::sync::watch::Receiver` instead of a frozen `u64`, re-reads it before each chunk fetch and again after every successful network read so a mid-flight shrink clamps the next write to the new boundary; per-segment progress is exposed via an `Arc` so the engine can pick the slowest candidate by throughput (`downloaded / elapsed`). After every split, the engine atomically rewrites `.vortex-meta` with the updated segment topology so resume after a crash mid-split sees a consistent state. (task 17, PR #111 review) +- Dynamic segment splitting (PRD-v2 P0.17, task 17): when a parallel segment finishes before its peers, the engine now re-evaluates the still-running segments, picks the slowest one whose remaining range exceeds `dynamic_split_min_remaining_mb` (default 4 MiB) and shrinks it in place — a fresh worker takes the upper half so the tail of the download accelerates instead of stalling on a single slow connection. Backend ships a domain-pure `Segment::split(at_byte, new_id)` validation method (state must be `Downloading`, split point strictly inside the unfetched range, caller-provided id must differ from the original — IDs are allocated by the engine's monotonic `next_segment_id` counter, never invented inside the domain), a new `DomainEvent::SegmentSplit { download_id, original_segment_id, new_segment_id, split_at }` forwarded as the `segment-split` Tauri event and logged in the per-download log store, two new `AppConfig` / `ConfigPatch` / `SettingsDto` fields `dynamic_split_enabled` (default `true`) and `dynamic_split_min_remaining_mb` (default `4`) wired through the toml config store, the Tauri IPC `SettingsDto`/`ConfigPatchDto` (so the frontend can both read and write them) and the new `application::services::engine_config_bridge` subscriber so live `settings_update` calls reconfigure already-running engines without a restart. `SegmentedDownloadEngine` stores `dynamic_split_enabled` / `dynamic_split_min_remaining_bytes` in `Arc` / `Arc` and exposes a `set_dynamic_split(enabled, min_remaining_mb)` setter consumed by the bridge. After a split, the engine updates the original slot's `initial_end` to `split_at` immediately on successful `end_tx.send`, so a subsequent `pick_split_target` evaluation cannot expand the worker's range past the shrunk boundary and `persist_split_meta` records the post-split topology rather than the stale one (closes coderabbit P1 + greptile P1 race). Each segment task now returns `(slot_idx, Result)`; on success the engine flips a `completed: bool` flag on the slot — `pick_split_target` skips completed slots so they cannot be re-picked, and `persist_split_meta` keeps the entry with `completed: true` and a full-range `downloaded_bytes` so a crash right after a split never loses the record of byte ranges already on disk. `pick_split_target` also gates on a 500 ms / non-zero-progress sample window: a fresh split child cannot be picked again until it has actually produced a throughput sample, preventing cascading fragmentation of the newest range. The segment worker accepts the upper bound through a `tokio::sync::watch::Receiver` instead of a frozen `u64`, re-reads it before each chunk fetch and again after every successful network read so a mid-flight shrink clamps the next write to the new boundary; per-segment progress is exposed via an `Arc` so the engine can pick the slowest candidate by throughput (`downloaded / elapsed`). After every split, the engine atomically rewrites `.vortex-meta` with the updated segment topology so resume after a crash mid-split sees a consistent state. (task 17, PR #111 review) - "Report broken plugin" action (PRD-v2 P0.16, task 16): plugins listed in *Plugins → Plugin Store* now expose a *Report broken plugin* item in their kebab menu. Clicking it opens the user's default browser at a pre-filled GitHub issue on the plugin's repository, with diagnostic metadata (plugin name + version, Vortex version, OS, optional URL under test, last 50 log lines) inlined into the issue body. Backend adds a `repository_url` field to `domain::model::plugin::PluginInfo` (parsed from the new `[plugin].repository` key in `plugin.toml`), a `domain::ports::driven::UrlOpener` port plus its platform-native `SystemUrlOpener` adapter (`xdg-open` / `open` / `cmd start`, `http(s)://` only by validation), the std-only `domain::model::plugin::build_report_broken_url` URL builder (RFC 3986 unreserved-set percent encoder, last 50 log lines, GitHub-only repository hosts, accepts `.git` suffix, rejects malformed URLs with `DomainError::ValidationError`), and a `ReportBrokenPluginCommand` handler that returns `AppError::Validation` when a manifest carries no `repository_url`. New Tauri IPC `plugin_report_broken(pluginName, logLines?, testedUrl?) → string` returns the issue URL so the UI can fall back to clipboard copy if the launcher fails. i18n (en/fr): `plugins.action.reportBroken`, `plugins.toast.reportBrokenSuccess`, `plugins.toast.reportBrokenError`. (task 16) - Dynamic plugin configuration UI (PRD-v2 P0.15, task 15): plugins declaring a `[config]` block in their `plugin.toml` now expose their schema at runtime. Backend adds `ConfigField` / `ConfigFieldType` / `PluginConfigSchema` to `domain/model/plugin.rs` (typed validation, enum options, `min`/`max` bounds, regex via a std-only matcher — no external import in the domain), a `PluginConfigStore` port (`get_values` / `set_value` / `list_all` / `delete_all`) implemented by `SqlitePluginConfigRepo` backed by the new `plugin_configs (plugin_name, key, value)` table (migration `m20260425_000005_create_plugin_configs`, composite primary key). The manifest parser (`adapters/driven/plugin/manifest.rs`) now extracts `type`, `default`, `options`, `description`, `min`, `max`, `regex` on top of the existing defaults, and rejects defaults that fail their own field validation. CQRS gains `UpdatePluginConfigCommand` (validates against the schema, applies the runtime first then persists, rolls back on failure) and `GetPluginConfigQuery` (returns the schema plus persisted values, dropping any persisted entry that no longer matches the current schema and falling back to manifest defaults). `PluginLoader` is extended with `get_manifest()` and `set_runtime_config()`; `ExtismPluginLoader` implements both by reading from `PluginRegistry` and writing to `SharedHostResources::plugin_configs`, so `get_config(key)` calls from the WASM plugin observe the new value without a reload. At startup, `lib.rs` replays persisted configs onto the in-memory map before plugins are loaded. Frontend adds two components: `PluginConfigField.tsx` (dispatcher renderer: `string` → text input, `boolean` → shadcn switch, `integer`/`float` → numeric input with bounds, `url` → url input, `enum` (and `string` with options) → shadcn select; `aria-describedby` on the control points to the error message) and `PluginConfigDialog.tsx` (loads the schema via `useQuery`, validates each field on the UI side (rejects empty floats, validates JSON arrays) before sending, persists changed values sequentially, guards the schema-reset effect while a save is in flight to avoid clobbering the draft, invalidates the query on success). `PluginsView` queries `plugin_config_get` for each installed plugin (keyed off the unfiltered installed list to avoid churn while typing in search) to decide whether the *Configure* button (Settings icon, next to the *More* menu) should render: a plugin without `[config]` exposes no button. New IPC commands `plugin_config_get(name) → PluginConfigView` and `plugin_config_update(name, key, value)`. i18n (en/fr): `plugins.action.configure`, `plugins.config.{title,description,loading,error,noFields,toast.{saveSuccess,validationFailed}}`. (task 15) - History retention with automatic daily purge (PRD-v2 P0.14, task 14): new `history_retention_days` setting (default 30, presets 7 / 30 / 90 / 365 / `0 = unlimited`) exposed in the *General* Settings tab as a `Select` dropdown wired to `settings_update`. Backend ships a `Clock` domain port (`SystemClock` adapter under `adapters/driven/scheduler/`) and a `HistoryPurgeWorker` daemon spawned during Tauri setup that hard-deletes `history` rows where `completed_at < now - retention_days * 86_400`. The worker persists its last run as a Unix-epoch timestamp inside `/.history_purge_state` (sentinel filename `HISTORY_PURGE_STATE_FILE`). On startup, the daemon reads the sentinel and either runs immediately (missing/stale) or sleeps for `SECS_PER_DAY - elapsed` so the first post-launch purge stays anchored to the previous successful run instead of drifting up to ~47h after a restart; the recurring loop then ticks every 24h via `tokio::time::interval` with `MissedTickBehavior::Skip`. `retention_days <= 0` is a no-op that does not write the sentinel, so the next run re-fires the moment the user re-enables retention; corrupt sentinels are treated as "never ran" so a stuck file never blocks the scheduler. The worker shares the same `Arc` and `Arc` the IPC layer already mutates, so a settings change is observed without restart. Domain helper `normalize_history_retention_days` clamps negatives back to `0` and is now applied at every write boundary — `apply_patch` (so a crafted `settings_update` payload cannot persist a negative) and `From for AppConfig` (so a hand-edited `config.toml` is normalized at load) — plus the worker itself for defense-in-depth. (task 14) diff --git a/src-tauri/src/adapters/driven/network/download_engine.rs b/src-tauri/src/adapters/driven/network/download_engine.rs index 91e45fb..f91fc12 100644 --- a/src-tauri/src/adapters/driven/network/download_engine.rs +++ b/src-tauri/src/adapters/driven/network/download_engine.rs @@ -21,6 +21,14 @@ struct ActiveDownload { pause_sender: watch::Sender, } +/// Minimum age and downloaded bytes a segment must have before it is +/// eligible for split. Without this gate a fresh split child (downloaded == 0, +/// elapsed ≈ 0) would compute as 0 B/s, become the guaranteed "slowest" +/// candidate, and be re-split immediately on the next completion event — +/// cascading fragmentation of the newest range without any real slow-tail +/// signal. +const MIN_SPLIT_SAMPLE_DURATION: std::time::Duration = std::time::Duration::from_millis(500); + /// Runtime state of one in-flight segment, tracked by the engine so it can /// shrink the segment's range and observe its throughput for dynamic split. struct SegmentRuntimeState { @@ -29,22 +37,37 @@ struct SegmentRuntimeState { started_at: std::time::Instant, start_byte: u64, initial_end: u64, + /// Set by the coordinator when the worker for this slot returns `Ok(_)`. + /// Completed slots stay in `active_segments` (instead of being cleared) + /// so `persist_split_meta` records their byte range with `completed: true` + /// — otherwise a crash right after a split would leave the resume meta + /// without any record that those bytes are already on disk. + completed: bool, } /// Pick the slowest active segment whose remaining range is large enough /// to benefit from a split. Returns the slot index and the byte at which /// to split (midpoint of the remaining range). fn pick_split_target( - segments: &[Option], + segments: &[SegmentRuntimeState], min_remaining_bytes: u64, ) -> Option<(usize, u64)> { let mut slowest: Option<(usize, f64, u64)> = None; - for (idx, slot) in segments.iter().enumerate() { - let Some(state) = slot else { continue }; + for (idx, state) in segments.iter().enumerate() { + if state.completed { + continue; + } if state.initial_end == u64::MAX { continue; // unbounded segments cannot be split } let downloaded = state.progress.load(Ordering::Relaxed); + if downloaded == 0 { + continue; // no throughput sample yet + } + let elapsed = state.started_at.elapsed(); + if elapsed < MIN_SPLIT_SAMPLE_DURATION { + continue; // worker hasn't run long enough to produce a meaningful bps + } let current_offset = state.start_byte.saturating_add(downloaded); if current_offset >= state.initial_end { continue; // already at end — completion event will fire shortly @@ -57,8 +80,7 @@ fn pick_split_target( if split_at <= current_offset || split_at >= state.initial_end { continue; } - let elapsed = state.started_at.elapsed().as_secs_f64().max(1e-3); - let bps = downloaded as f64 / elapsed; + let bps = downloaded as f64 / elapsed.as_secs_f64().max(1e-3); match slowest { None => slowest = Some((idx, bps, split_at)), Some((_, prev_bps, _)) if bps < prev_bps => { @@ -79,19 +101,28 @@ async fn persist_split_meta( download_id: DownloadId, url: &str, total_size: u64, - active_segments: &[Option], + active_segments: &[SegmentRuntimeState], ) { + // Snapshot every slot — including completed ones — so a crash right + // after a split does not lose the record of byte ranges already on + // disk. Completed segments report their full range as downloaded so + // resume does not re-fetch them. let segments_meta: Vec = active_segments .iter() .enumerate() - .filter_map(|(i, slot)| { - slot.as_ref().map(|st| SegmentMeta { + .map(|(i, st)| { + let downloaded = if st.completed { + st.initial_end.saturating_sub(st.start_byte) + } else { + st.progress.load(Ordering::Relaxed) + }; + SegmentMeta { id: i as u32, start_byte: st.start_byte, end_byte: st.initial_end, - downloaded_bytes: st.progress.load(Ordering::Relaxed), - completed: false, - }) + downloaded_bytes: downloaded, + completed: st.completed, + } }) .collect(); let now = std::time::SystemTime::now() @@ -421,16 +452,17 @@ impl DownloadEngine for SegmentedDownloadEngine { let shared_downloaded = Arc::new(AtomicU64::new(0)); let mut join_set: JoinSet<(usize, Result)> = JoinSet::new(); - let mut segment_state: Vec = Vec::with_capacity(segments.len()); + let mut active_segments: Vec = Vec::with_capacity(segments.len()); for (index, (start, end)) in segments.iter().enumerate() { let (end_tx, end_rx) = watch::channel(*end); let progress = Arc::new(AtomicU64::new(0)); - segment_state.push(SegmentRuntimeState { + active_segments.push(SegmentRuntimeState { end_tx, progress: progress.clone(), started_at: std::time::Instant::now(), start_byte: *start, initial_end: *end, + completed: false, }); let params = SegmentParams { client: client.clone(), @@ -456,16 +488,16 @@ impl DownloadEngine for SegmentedDownloadEngine { let mut failed = false; let mut error_msg = String::new(); let mut next_segment_id: u32 = segments.len() as u32; - let mut active_segments: Vec> = - segment_state.into_iter().map(Some).collect(); while let Some(result) = join_set.join_next().await { match result { Ok((slot_idx, Ok(_bytes))) => { - // Clear the completed slot so pick_split_target ignores it - // and persist_split_meta reflects the live topology. + // Mark the slot completed instead of removing it — the + // persist_split_meta call below must record completed + // ranges so a crash mid-split does not lose the fact + // that those bytes are already on disk. if slot_idx < active_segments.len() { - active_segments[slot_idx] = None; + active_segments[slot_idx].completed = true; } if dynamic_split_enabled.load(Ordering::Relaxed) @@ -481,18 +513,11 @@ impl DownloadEngine for SegmentedDownloadEngine { // subsequent pick_split_target on the same slot — or a // crash recovery via persist_split_meta — observes the // shrunk range, not the pre-split end. - let (initial_end, signal_sent) = { - let old_state = active_segments[idx] - .as_mut() - .expect("slot present at split time"); - let initial_end = old_state.initial_end; - let signal_sent = old_state.end_tx.send(split_at).is_ok(); - if signal_sent { - old_state.initial_end = split_at; - } - (initial_end, signal_sent) - }; - if !signal_sent { + let initial_end = active_segments[idx].initial_end; + let signal_sent = active_segments[idx].end_tx.send(split_at).is_ok(); + if signal_sent { + active_segments[idx].initial_end = split_at; + } else { tracing::warn!( download_id = download_id.0, original_segment_id = idx as u32, @@ -530,13 +555,14 @@ impl DownloadEngine for SegmentedDownloadEngine { join_set.spawn(async move { (new_slot_idx, download_segment(params).await) }); - active_segments.push(Some(SegmentRuntimeState { + active_segments.push(SegmentRuntimeState { end_tx: new_end_tx, progress: new_progress, started_at: std::time::Instant::now(), start_byte: split_at, initial_end, - })); + completed: false, + }); persist_split_meta( &file_storage, @@ -549,10 +575,11 @@ impl DownloadEngine for SegmentedDownloadEngine { .await; } } - Ok((slot_idx, Err(e))) => { - if slot_idx < active_segments.len() { - active_segments[slot_idx] = None; - } + Ok((_slot_idx, Err(e))) => { + // Errored slots stay in active_segments without + // `completed = true`; pick_split_target ignores them + // because the worker is gone (end_tx send fails) and + // the cancel below tears the whole download down. match e { SegmentError::Cancelled => { cancel_token.cancel(); @@ -1113,22 +1140,21 @@ mod tests { #[test] fn test_pick_split_target_prefers_slowest_above_threshold() { - let make = |start: u64, end: u64, downloaded: u64, age_ms: u64| { - Some(SegmentRuntimeState { - end_tx: watch::channel(end).0, - progress: Arc::new(AtomicU64::new(downloaded)), - started_at: std::time::Instant::now() - std::time::Duration::from_millis(age_ms), - start_byte: start, - initial_end: end, - }) + let make = |start: u64, end: u64, downloaded: u64, age_ms: u64| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(age_ms), + start_byte: start, + initial_end: end, + completed: false, }; let segs = [ - // fast: 1 MiB downloaded in 100 ms → 10 MiB/s - make(0, 16 * 1024 * 1024, 1024 * 1024, 100), + // fast: 1 MiB downloaded in 1500 ms → ~700 KiB/s + make(0, 16 * 1024 * 1024, 1024 * 1024, 1500), // slow: 100 KiB in 1000 ms → ~100 KiB/s, plenty of remaining make(16 * 1024 * 1024, 32 * 1024 * 1024, 100 * 1024, 1000), // tiny remaining → must be filtered - make(32 * 1024 * 1024, 32 * 1024 * 1024 + 1024, 512, 200), + make(32 * 1024 * 1024, 32 * 1024 * 1024 + 1024, 512, 600), ]; let pick = pick_split_target(&segs, 4 * 1024 * 1024); assert_eq!( @@ -1149,20 +1175,66 @@ mod tests { #[test] fn test_pick_split_target_returns_none_when_all_below_threshold() { - let make = |start: u64, end: u64, downloaded: u64| { - Some(SegmentRuntimeState { - end_tx: watch::channel(end).0, - progress: Arc::new(AtomicU64::new(downloaded)), - started_at: std::time::Instant::now(), - start_byte: start, - initial_end: end, - }) + let make = |start: u64, end: u64, downloaded: u64| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(800), + start_byte: start, + initial_end: end, + completed: false, }; - let segs = [make(0, 1024, 100), make(1024, 2048, 0), make(2048, 3072, 0)]; + let segs = [make(0, 1024, 100), make(1024, 2048, 1), make(2048, 3072, 1)]; let pick = pick_split_target(&segs, 4 * 1024 * 1024); assert!(pick.is_none(), "got {pick:?}"); } + #[test] + fn test_pick_split_target_skips_fresh_segments() { + // Brand-new split children should not be candidates: no throughput + // sample yet (downloaded == 0) and elapsed below MIN_SPLIT_SAMPLE_DURATION. + // A genuinely slow neighbor (1000 ms / 100 KiB) sits next to them. + let mk = |start: u64, end: u64, downloaded: u64, age_ms: u64| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(age_ms), + start_byte: start, + initial_end: end, + completed: false, + }; + let segs = [ + // fresh child: 0 bytes, 50 ms — must be skipped despite being "slowest" + mk(0, 16 * 1024 * 1024, 0, 50), + // slightly older but still no sample — must be skipped + mk(16 * 1024 * 1024, 32 * 1024 * 1024, 0, 200), + // genuinely slow but mature + mk(32 * 1024 * 1024, 48 * 1024 * 1024, 100 * 1024, 1000), + ]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert_eq!(pick.map(|(i, _)| i), Some(2), "got {pick:?}"); + } + + #[test] + fn test_pick_split_target_skips_completed_segments() { + // A completed slot must never be picked even if its throughput was the + // slowest before completion. + let mk = |start: u64, end: u64, downloaded: u64, completed: bool| SegmentRuntimeState { + end_tx: watch::channel(end).0, + progress: Arc::new(AtomicU64::new(downloaded)), + started_at: std::time::Instant::now() - std::time::Duration::from_millis(1000), + start_byte: start, + initial_end: end, + completed, + }; + let segs = [ + // completed slow segment — must be ignored + mk(0, 16 * 1024 * 1024, 16 * 1024 * 1024, true), + // live, slower in absolute terms but only it is eligible + mk(16 * 1024 * 1024, 32 * 1024 * 1024, 100 * 1024, false), + ]; + let pick = pick_split_target(&segs, 4 * 1024 * 1024); + assert_eq!(pick.map(|(i, _)| i), Some(1)); + } + #[tokio::test] async fn test_pause_unknown_id_returns_not_found() { let storage = Arc::new(MockFileStorage::new()); From 74c3c91f371a7b380bddc803de2f30fe0dead97f Mon Sep 17 00:00:00 2001 From: Mathieu Piton <27002047+mpiton@users.noreply.github.com> Date: Sun, 26 Apr 2026 16:46:26 +0200 Subject: [PATCH 5/6] chore: document MSRV as rust-version = "1.85" in src-tauri/Cargo.toml Edition 2024 has been stable since Rust 1.85 (Feb 2025) and this project builds against rustc 1.95, but the MSRV was undocumented. Adding rust-version makes the floor explicit for tooling and contributors. --- src-tauri/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 2729d08..640509b 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -4,6 +4,7 @@ version = "0.2.0" description = "A desktop download manager" authors = ["mpiton"] edition = "2024" +rust-version = "1.85" license = "GPL-3.0-only" [lib] From 94c36344e085236b0ccd8d403fb5e4dbdd2fc241 Mon Sep 17 00:00:00 2001 From: Mathieu Piton <27002047+mpiton@users.noreply.github.com> Date: Sun, 26 Apr 2026 16:56:14 +0200 Subject: [PATCH 6/6] chore: pin rust-version to 1.95 to match the toolchain in use Previous commit set MSRV to 1.85 (the edition 2024 floor). Pin to the actual toolchain version used for development and CI so contributors don't accidentally try to build with an older rustc that, while technically supporting edition 2024, may lag on stdlib features the codebase relies on. --- src-tauri/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 640509b..9ae17c2 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -4,7 +4,7 @@ version = "0.2.0" description = "A desktop download manager" authors = ["mpiton"] edition = "2024" -rust-version = "1.85" +rust-version = "1.95" license = "GPL-3.0-only" [lib]