From 727ae1cf3699fa56773ad39cb4af335b419e3be5 Mon Sep 17 00:00:00 2001 From: Takeru Ohta Date: Fri, 13 Feb 2026 19:47:11 +0900 Subject: [PATCH 1/4] chore: replace rayon with custom thread pool implementation --- Cargo.lock | 27 ------------ Cargo.toml | 1 - src/main.rs | 122 ++++++++++++++++++++++++++++++++++------------------ 3 files changed, 79 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 531b88c..0ea56aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,7 +78,6 @@ dependencies = [ "ignore", "log", "noargs", - "rayon", "regex", "similar", "similar-asserts", @@ -112,12 +111,6 @@ dependencies = [ "efmt_core", ] -[[package]] -name = "either" -version = "1.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" - [[package]] name = "encode_unicode" version = "1.0.0" @@ -216,26 +209,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rayon" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" -dependencies = [ - "either", - "rayon-core", -] - -[[package]] -name = "rayon-core" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" -dependencies = [ - "crossbeam-deque", - "crossbeam-utils", -] - [[package]] name = "regex" version = "1.12.3" diff --git a/Cargo.toml b/Cargo.toml index a2ee3d7..1058d1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ exclude = ["/rebar3_efmt/", "efmt_wasm.wasm", "/vscode/"] erl_tokenize = "0.10.0" efmt_core = { path = "efmt_core", version = "0.7.0" } log = "0.4" -rayon = "1" similar = { version= "2", features = ["inline"] } regex = "1.6.0" colored = "3" diff --git a/src/main.rs b/src/main.rs index 143c601..85672b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use efmt::files::RebarConfigValue; use efmt_core::items::ModuleOrConfig; -use rayon::iter::{IntoParallelIterator as _, ParallelIterator}; use regex::Regex; use std::io::Read as _; use std::path::{Path, PathBuf}; +use std::sync::Mutex; +use std::sync::atomic::{AtomicUsize, Ordering}; use unicode_width::UnicodeWidthStr; type Result = efmt::Result; @@ -369,6 +370,69 @@ fn format_file_or_stdin>( Ok((original, formatted)) } +fn contains_stdin_path(files: &[PathBuf]) -> bool { + files.iter().any(|file| file.to_str() == Some("-")) +} + +fn should_run_in_parallel(files: &[PathBuf], requested_parallel: bool) -> bool { + if !requested_parallel { + return false; + } + if contains_stdin_path(files) { + log::warn!("parallel execution is disabled when input contains '-' (stdin)"); + return false; + } + true +} + +fn filter_target_files(files: &[PathBuf], parallel: bool, predicate: F) -> Vec +where + F: Fn(&Path) -> bool + Sync, +{ + if !parallel || files.len() <= 1 { + return files + .iter() + .filter(|file| predicate(file)) + .cloned() + .collect::>(); + } + + let workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + .min(files.len()); + let next_index = AtomicUsize::new(0); + let selected_indices = Mutex::new(Vec::new()); + + std::thread::scope(|scope| { + for _ in 0..workers { + scope.spawn(|| { + loop { + let index = next_index.fetch_add(1, Ordering::Relaxed); + if index >= files.len() { + break; + } + if predicate(&files[index]) { + selected_indices + .lock() + .unwrap_or_else(|e| e.into_inner()) + .push(index); + } + } + }); + } + }); + + let mut selected_indices = selected_indices + .into_inner() + .unwrap_or_else(|e| e.into_inner()); + selected_indices.sort_unstable(); + selected_indices + .into_iter() + .map(|index| files[index].clone()) + .collect::>() +} + fn format_files(opt: &Opt) -> crate::Result<()> { let format_options = opt.to_format_options(); @@ -399,19 +463,10 @@ fn format_files(opt: &Opt) -> crate::Result<()> { } } - let error_files = if opt.parallel { - opt.files - .clone() - .into_par_iter() - .filter(|file| do_format(opt, &format_options, file).is_err()) - .collect::>() - } else { - opt.files - .iter() - .filter(|file| do_format(opt, &format_options, file).is_err()) - .cloned() - .collect::>() - }; + let run_parallel = should_run_in_parallel(&opt.files, opt.parallel); + let error_files = filter_target_files(&opt.files, run_parallel, |file| { + do_format(opt, &format_options, file).is_err() + }); if !error_files.is_empty() { if opt.files.len() > 1 { @@ -504,35 +559,16 @@ fn check_files(opt: &Opt) -> crate::Result<()> { } } - let unformatted_files = if opt.parallel { - opt.files - .clone() - .into_par_iter() - .filter(|file| { - !do_check( - &format_options, - file, - opt.allow_partial_failure, - opt.color, - opt.check_line_length, - ) - }) - .collect::>() - } else { - opt.files - .iter() - .filter(|file| { - !do_check( - &format_options, - file, - opt.allow_partial_failure, - opt.color, - opt.check_line_length, - ) - }) - .cloned() - .collect::>() - }; + let run_parallel = should_run_in_parallel(&opt.files, opt.parallel); + let unformatted_files = filter_target_files(&opt.files, run_parallel, |file| { + !do_check( + &format_options, + file, + opt.allow_partial_failure, + opt.color, + opt.check_line_length, + ) + }); if !unformatted_files.is_empty() { eprintln!(); From 5d336687118c9f7f8f058639748a1327828bef3c Mon Sep 17 00:00:00 2001 From: Takeru Ohta Date: Fri, 13 Feb 2026 19:47:52 +0900 Subject: [PATCH 2/4] docs: add comment explaining Sync requirement for predicate --- src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main.rs b/src/main.rs index 85672b9..d09f50d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -385,6 +385,8 @@ fn should_run_in_parallel(files: &[PathBuf], requested_parallel: bool) -> bool { true } +// `predicate` is shared by reference across worker threads when `parallel` is true, +// so `F: Sync` is required. fn filter_target_files(files: &[PathBuf], parallel: bool, predicate: F) -> Vec where F: Fn(&Path) -> bool + Sync, From 0bf60a2f5fc4c541406d743ee550cbcc1d9ffa1a Mon Sep 17 00:00:00 2001 From: Takeru Ohta Date: Fri, 13 Feb 2026 19:58:05 +0900 Subject: [PATCH 3/4] refactor: replace Mutex with mpsc channel for thread-safe collection --- src/main.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index d09f50d..8c0be1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ use efmt_core::items::ModuleOrConfig; use regex::Regex; use std::io::Read as _; use std::path::{Path, PathBuf}; -use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use unicode_width::UnicodeWidthStr; @@ -404,30 +403,29 @@ where .unwrap_or(1) .min(files.len()); let next_index = AtomicUsize::new(0); - let selected_indices = Mutex::new(Vec::new()); + let (sender, receiver) = std::sync::mpsc::channel::(); + let next_index = &next_index; + let predicate = &predicate; std::thread::scope(|scope| { for _ in 0..workers { - scope.spawn(|| { + let sender = sender.clone(); + scope.spawn(move || { loop { let index = next_index.fetch_add(1, Ordering::Relaxed); if index >= files.len() { break; } if predicate(&files[index]) { - selected_indices - .lock() - .unwrap_or_else(|e| e.into_inner()) - .push(index); + let _ = sender.send(index); } } }); } }); + drop(sender); - let mut selected_indices = selected_indices - .into_inner() - .unwrap_or_else(|e| e.into_inner()); + let mut selected_indices = receiver.into_iter().collect::>(); selected_indices.sort_unstable(); selected_indices .into_iter() From 2badba69ffa1e201d1a430bfd47fa3c29dfde904 Mon Sep 17 00:00:00 2001 From: Takeru Ohta Date: Fri, 13 Feb 2026 20:04:24 +0900 Subject: [PATCH 4/4] refactor: remove unnecessary variable captures in thread scope --- src/main.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8c0be1c..3e420e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -404,13 +404,10 @@ where .min(files.len()); let next_index = AtomicUsize::new(0); let (sender, receiver) = std::sync::mpsc::channel::(); - let next_index = &next_index; - let predicate = &predicate; std::thread::scope(|scope| { for _ in 0..workers { - let sender = sender.clone(); - scope.spawn(move || { + scope.spawn(|| { loop { let index = next_index.fetch_add(1, Ordering::Relaxed); if index >= files.len() {