Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ let lines = Exec::cmd("sqlite3")
Create pipelines using the `|` operator:

```rust
let top_mem = (Exec::cmd("ps").args(&["aux"])
| Exec::cmd("sort").args(&["-k4", "-rn"])
let top_mem = (Exec::cmd("ps").args(["aux"])
| Exec::cmd("sort").args(["-k4", "-rn"])
| Exec::cmd("head").arg("-5"))
.capture()?
.stdout_str();
Expand Down
4 changes: 2 additions & 2 deletions examples/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use subprocess::Exec;

fn main() -> std::io::Result<()> {
// Simple pipeline: generate data, transform it, capture output
let data = (Exec::cmd("echo").args(&["cherry", "apple", "banana"])
| Exec::cmd("tr").args(&[" ", "\n"])
let data = (Exec::cmd("echo").args(["cherry", "apple", "banana"])
| Exec::cmd("tr").args([" ", "\n"])
| Exec::cmd("sort"))
.capture()?
.stdout_str();
Expand Down
14 changes: 9 additions & 5 deletions src/communicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,12 +435,13 @@ use win32::RawCommunicator;
/// serially, our process could block waiting to write input while the subprocess blocks
/// waiting for us to read its output, or vice versa.
///
/// Create a `Communicator` by calling [`Job::communicate`], then call [`read`] or
/// [`read_string`] to perform the data exchange.
/// Create a `Communicator` by calling [`Job::communicate`], then call [`read`],
/// [`read_string`], or [`read_to`] to perform the data exchange.
///
/// [`Job::communicate`]: crate::Job::communicate
/// [`read`]: #method.read
/// [`read_string`]: #method.read_string
/// [`read_to`]: #method.read_to
#[must_use]
pub struct Communicator {
inner: RawCommunicator,
Expand Down Expand Up @@ -539,7 +540,7 @@ impl Communicator {
///
/// Note that this method does not wait for the subprocess to finish, only to close its
/// output/error streams. It is rare but possible for the program to continue running
/// after having closed the streams, in which case `Process::Drop` will wait for it
/// after having closed the streams, in which case `Process::drop()` will wait for it
/// to finish. If such a wait is undesirable, it can be prevented by waiting
/// explicitly using `wait()`, by detaching the process using `detach()`, or by
/// terminating it with `terminate()`.
Expand All @@ -564,7 +565,10 @@ impl Communicator {
Ok((from_utf8_lossy(out), from_utf8_lossy(err)))
}

/// Limit the amount of data the next `read()` will read from the subprocess.
/// Limit the amount of data each `read()` will read from the subprocess.
///
/// The limit applies per call and persists across subsequent reads, so any data
/// beyond the limit is left buffered for the next `read()` to retrieve.
///
/// On Windows, when capturing both stdout and stderr, the limit is approximate
/// and may be exceeded by several kilobytes.
Expand All @@ -573,7 +577,7 @@ impl Communicator {
self
}

/// Limit the amount of time the next `read()` will spend reading from the subprocess.
/// Limit the amount of time each `read()` will spend reading from the subprocess.
pub fn limit_time(mut self, time: Duration) -> Communicator {
self.time_limit = Some(time);
self
Expand Down
14 changes: 9 additions & 5 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ impl Exec {
/// code is prone to errors and, if `filename` comes from an untrusted source, to
/// shell injection attacks. Instead, use `Exec::cmd("sort").arg(filename)`.
pub fn shell(cmdstr: impl Into<OsString>) -> Exec {
let cmd = Exec::cmd(SHELL[0]).args(&SHELL[1..]);
let [shell, flag] = SHELL;
let cmd = Exec::cmd(shell).arg(flag);
#[cfg(not(windows))]
{
cmd.arg(cmdstr)
Expand Down Expand Up @@ -331,9 +332,9 @@ impl Exec {
///
/// * a [`Redirection`];
/// * a `File`, which is a shorthand for `Redirection::File(file)`;
/// * a `Vec<u8>`, `&str`, `&[u8]`, `Box<[u8]>`, or `[u8; N]`, which will set up a
/// `Redirection::Pipe` for stdin, feeding that data into the standard input of the
/// subprocess;
/// * a `Vec<u8>`, `&'static str`, `&'static [u8]`, `Box<[u8]>`, or `[u8; N]`, which
/// will set up a `Redirection::Pipe` for stdin, feeding that data into the standard
/// input of the subprocess;
/// * an [`InputData`], which also sets up a pipe, but wraps any reader and feeds its
/// content to the standard input of the subprocess. Use [`InputData::from_bytes`]
/// for in-memory byte containers not covered by the above, like `bytes::Bytes` or
Expand Down Expand Up @@ -543,7 +544,10 @@ impl Exec {
self.start()?.capture()
}

/// Show Exec as command-line string quoted in the Unix style.
/// Show `Exec` as a command-line string using POSIX shell quoting.
///
/// The output uses POSIX shell quoting rules on all platforms, including Windows;
/// it is intended for display and debugging, not for re-execution by `cmd.exe`.
pub fn to_cmdline_lossy(&self) -> String {
let mut out = String::new();
if let Some(cmd_env) = &self.env {
Expand Down
4 changes: 2 additions & 2 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl Job {

/// Returns the PIDs of all processes in the pipeline, in pipeline order.
///
/// If the job was started by a single process, this will return its pid. It will be
/// empty for a job started by an empty pipeline.
/// If the job was started from a single command (`Exec`), this returns one PID. It
/// will be empty for a job started by an empty pipeline.
pub fn pids(&self) -> Vec<u32> {
self.processes.iter().map(|p| p.pid()).collect()
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//! # }
//! ```
//!
//! Use the [`Exec`] builder to execute a pipeline of commands and capture the output:
//! Combine [`Exec`] instances with `|` to build a [`Pipeline`] and capture its output:
//!
//! ```no_run
//! # use subprocess::*;
Expand Down
13 changes: 6 additions & 7 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ impl Pipeline {
///
/// * a [`Redirection`];
/// * a `File`, which is a shorthand for `Redirection::File(file)`;
/// * a `Vec<u8>`, `&str`, `&[u8]`, `Box<[u8]>`, or `[u8; N]`, which will set up a
/// `Redirection::Pipe` for stdin, feeding that data into the standard input of the
/// subprocess;
/// * a `Vec<u8>`, `&'static str`, `&'static [u8]`, `Box<[u8]>`, or `[u8; N]`, which
/// will set up a `Redirection::Pipe` for stdin, feeding that data into the standard
/// input of the subprocess;
/// * an [`InputData`], which also sets up a pipe, but wraps any reader and feeds its
/// content to the standard input of the subprocess. Use [`InputData::from_bytes`]
/// for in-memory byte containers not covered by the above, like `bytes::Bytes` or
Expand Down Expand Up @@ -188,10 +188,9 @@ impl Pipeline {
/// Specifies the sink for the standard error of all commands in the pipeline.
///
/// Unlike `stdout()`, which only affects the last command in the pipeline, this
/// affects all commands. The difference is because standard output is piped from one
/// command to the next, so only the output of the last command is "free". In
/// contrast, the standard errors are not connected to each other and can be
/// configured *en masse*.
/// affects all commands. This is because standard output is piped from one command
/// to the next, so only the output of the last command is "free". In contrast, the
/// standard errors are not connected to each other and can be configured *en masse*.
///
/// The sink can be:
///
Expand Down
86 changes: 71 additions & 15 deletions src/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,25 @@ pub fn pipe() -> Result<(File, File)> {
pub fn pipe() -> Result<(File, File)> {
let mut fds = [0; 2];
check_err(unsafe { libc::pipe(fds.as_mut_ptr()) })?;
// Wrap in File immediately so an fcntl failure below closes both fds via Drop
// instead of leaking them.
let (read, write) = unsafe { (File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1])) };
// Set CLOEXEC on both ends. There is a small race window between pipe() and fcntl()
// where another thread's fork()+exec() could inherit these fds - this matches what
// Rust stdlib does on these platforms that lack pipe2().
unsafe {
check_err(libc::fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?;
check_err(libc::fcntl(fds[1], libc::F_SETFD, libc::FD_CLOEXEC))?;
check_err(libc::fcntl(
read.as_raw_fd(),
libc::F_SETFD,
libc::FD_CLOEXEC,
))?;
check_err(libc::fcntl(
write.as_raw_fd(),
libc::F_SETFD,
libc::FD_CLOEXEC,
))?;
}
Ok(unsafe { (File::from_raw_fd(fds[0]), File::from_raw_fd(fds[1])) })
Ok((read, write))
}

// marked unsafe because the child must not allocate before exec-ing
Expand Down Expand Up @@ -188,7 +199,10 @@ impl PrepExec {
let mut exe = std::mem::ManuallyDrop::new(std::mem::take(&mut this.prealloc_exe));

if let Some(ref search_path) = this.search_path {
let mut err = Ok(());
// Default to ENOENT so a PATH with no usable entries (e.g. ":::") yields
// a real error instead of Ok(()) - the caller would otherwise hit
// unreachable!() in the child after fork.
let mut err = Err(Error::from_raw_os_error(libc::ENOENT));
// POSIX requires execvp and execve, but not execvpe (although glibc provides
// one), so we have to iterate over PATH ourselves
for dir in split_path(search_path.as_os_str()) {
Expand Down Expand Up @@ -297,6 +311,35 @@ pub fn waitpid(pid: u32, flags: i32) -> Result<(u32, ExitStatus)> {
Ok((pid as u32, ExitStatus::from_raw(status)))
}

/// Block until `pid` has exited, leaving it as a zombie (does not reap).
///
/// The PID stays tied to the dead child until something calls `waitpid` to reap it, so a
/// signal sent in the window between this returning and the reap is delivered to the
/// original child (or to a zombie, which is a no-op) - never to a recycled PID.
///
/// Restarted automatically on `EINTR`.
pub fn wait_no_reap(pid: u32) -> Result<()> {
loop {
let mut info: libc::siginfo_t = unsafe { mem::zeroed() };
let r = unsafe {
libc::waitid(
libc::P_PID,
pid as libc::id_t,
&mut info,
libc::WEXITED | libc::WNOWAIT,
)
};
if r == 0 {
return Ok(());
}
let err = Error::last_os_error();
if err.raw_os_error() == Some(libc::EINTR) {
continue;
}
return Err(err);
}
}

#[cfg(target_os = "linux")]
pub fn pidfd_open(pid: u32) -> Result<std::os::unix::io::OwnedFd> {
let fd =
Expand All @@ -318,6 +361,7 @@ pub fn killpg(pgid: u32, signal: i32) -> Result<()> {

pub const F_GETFD: i32 = libc::F_GETFD;
pub const F_SETFD: i32 = libc::F_SETFD;
pub const F_DUPFD_CLOEXEC: i32 = libc::F_DUPFD_CLOEXEC;
pub const FD_CLOEXEC: i32 = libc::FD_CLOEXEC;

pub fn fcntl(fd: i32, cmd: i32, arg1: Option<i32>) -> Result<i32> {
Expand Down Expand Up @@ -393,11 +437,12 @@ impl PollFd<'_> {
pub use libc::{POLLHUP, POLLIN, POLLOUT};

pub fn poll(fds: &mut [PollFd<'_>], mut timeout: Option<Duration>) -> Result<usize> {
// The loop handles two cases:
// - poll() accepts a maximum timeout of 2**31-1 ms (less than 25 days), and the
// caller can specify larger Durations.
// - poll() fails with EINTR when interrupted by a signal handler.
let deadline = timeout.map(|timeout| Instant::now() + timeout);
loop {
// poll() accepts a maximum timeout of 2**31-1 ms, which is less than 25 days.
// The caller can specify Durations much larger than that, so support them by
// waiting in a loop.
let (timeout_ms, overflow) = timeout
.map(|timeout| {
let timeout = timeout.as_millis();
Expand All @@ -409,17 +454,28 @@ pub fn poll(fds: &mut [PollFd<'_>], mut timeout: Option<Duration>) -> Result<usi
})
.unwrap_or((-1, false));
let fds_ptr = fds.as_ptr() as *mut libc::pollfd;
let cnt = unsafe { check_err(libc::poll(fds_ptr, fds.len() as libc::nfds_t, timeout_ms))? };
if cnt != 0 || !overflow {
return Ok(cnt as usize);
let raw = unsafe { libc::poll(fds_ptr, fds.len() as libc::nfds_t, timeout_ms) };
if raw >= 0 {
let cnt = raw as usize;
if cnt != 0 || !overflow {
return Ok(cnt);
}
// Timeout fired on the clamped value; loop to wait the rest.
} else {
let err = Error::last_os_error();
if err.raw_os_error() != Some(libc::EINTR) {
return Err(err);
}
// Interrupted by a signal; loop and retry.
}

let deadline = deadline.unwrap();
let now = Instant::now();
if now >= deadline {
return Ok(0);
if let Some(deadline) = deadline {
let now = Instant::now();
if now >= deadline {
return Ok(0);
}
timeout = Some(deadline - now);
}
timeout = Some(deadline - now);
}
}

Expand Down
Loading
Loading