From cdfb75d8d73e9b4a7a96208ed66ddac9b74516e9 Mon Sep 17 00:00:00 2001 From: Jules Ivanic Date: Wed, 14 May 2025 17:53:53 +1000 Subject: [PATCH 1/2] Make the command execution interruptable --- .../zio/process/CommandPlatformSpecific.scala | 78 ++++++++++++------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala b/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala index 86d48d26..cbbac5f1 100644 --- a/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala +++ b/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala @@ -15,18 +15,20 @@ */ package zio.process + import scala.annotation.nowarn import FilePlatformSpecific._ -import zio.NonEmptyChunk +import zio.{Chunk, Exit, NonEmptyChunk, Ref, UIO, Unsafe, ZIO} + import java.lang.ProcessBuilder.Redirect import scala.jdk.CollectionConverters._ -import zio.ZIO import java.io.OutputStream import zio.stream.ZSink -import zio.Chunk private[process] trait CommandPlatformSpecific { + type JProcess = java.lang.Process + @nowarn protected def checkDirectory(dir: File): Boolean = true @@ -34,39 +36,55 @@ private[process] trait CommandPlatformSpecific { @nowarn protected def build(c: Command.Standard, piping: Option[java.io.InputStream]): ZIO[Any, Throwable, Process] = - ZIO.attempt { - val builder = new ProcessBuilder(adaptCommand(c.command): _*) - builder.redirectErrorStream(c.redirectErrorStream) - c.workingDirectory.foreach { dir => - if (!checkDirectory(dir)) throw CommandError.WorkingDirectoryMissing(dir) - builder.directory(dir) - } + ZIO.asyncInterrupt { cb => + val ref: Ref.Atomic[JProcess] = Ref.unsafe.make[JProcess](null)(Unsafe.unsafe) - if (c.env.nonEmpty) { - builder.environment().putAll(c.env.asJava) - } + def unsafeRunCmd: Process = { + val builder = new ProcessBuilder(adaptCommand(c.command): _*) + builder.redirectErrorStream(c.redirectErrorStream) + c.workingDirectory.foreach { dir => + if (!checkDirectory(dir)) throw CommandError.WorkingDirectoryMissing(dir) + builder.directory(dir) + } - c.stdin match { - case ProcessInput.Inherit => builder.redirectInput(Redirect.INHERIT) - case ProcessInput.Pipe => builder.redirectInput(Redirect.PIPE) - case _ => () - } + if (c.env.nonEmpty) { + builder.environment().putAll(c.env.asJava) + } - c.stdout match { - case ProcessOutput.FileRedirect(file) => builder.redirectOutput(Redirect.to(file)) - case ProcessOutput.FileAppendRedirect(file) => builder.redirectOutput(Redirect.appendTo(file)) - case ProcessOutput.Inherit => builder.redirectOutput(Redirect.INHERIT) - case ProcessOutput.Pipe => builder.redirectOutput(Redirect.PIPE) - } + c.stdin match { + case ProcessInput.Inherit => builder.redirectInput(Redirect.INHERIT) + case ProcessInput.Pipe => builder.redirectInput(Redirect.PIPE) + case _ => () + } + + c.stdout match { + case ProcessOutput.FileRedirect(file) => builder.redirectOutput(Redirect.to(file)) + case ProcessOutput.FileAppendRedirect(file) => builder.redirectOutput(Redirect.appendTo(file)) + case ProcessOutput.Inherit => builder.redirectOutput(Redirect.INHERIT) + case ProcessOutput.Pipe => builder.redirectOutput(Redirect.PIPE) + } + + c.stderr match { + case ProcessOutput.FileRedirect(file) => builder.redirectError(Redirect.to(file)) + case ProcessOutput.FileAppendRedirect(file) => builder.redirectError(Redirect.appendTo(file)) + case ProcessOutput.Inherit => builder.redirectError(Redirect.INHERIT) + case ProcessOutput.Pipe => builder.redirectError(Redirect.PIPE) + } - c.stderr match { - case ProcessOutput.FileRedirect(file) => builder.redirectError(Redirect.to(file)) - case ProcessOutput.FileAppendRedirect(file) => builder.redirectError(Redirect.appendTo(file)) - case ProcessOutput.Inherit => builder.redirectError(Redirect.INHERIT) - case ProcessOutput.Pipe => builder.redirectError(Redirect.PIPE) + val jProcess = builder.start() + ref.unsafe.set(jProcess)(Unsafe.unsafe) + Process(jProcess) } - Process(builder.start()) + val canceler: UIO[Unit] = + ref.get.flatMap { + case null => Exit.unit + case process => ZIO.attempt(process.destroy()).ignoreLogged + } + + cb(ZIO.attempt(unsafeRunCmd)) + + Left(canceler) } protected def connectStdin(process: Process, stdin: ProcessInput): ZIO[Any, CommandError, Unit] = From 797b81f535854a0681f10d3f3a850c843e0a22fa Mon Sep 17 00:00:00 2001 From: Jules Ivanic Date: Tue, 16 Dec 2025 21:34:51 +1100 Subject: [PATCH 2/2] WIP --- .../zio/process/CommandPlatformSpecific.scala | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala b/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala index cbbac5f1..7e32627b 100644 --- a/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala +++ b/zio-process/jvm/src/main/scala/zio/process/CommandPlatformSpecific.scala @@ -15,15 +15,14 @@ */ package zio.process +import zio.process.FilePlatformSpecific._ +import zio.stream.ZSink +import zio.{ Chunk, NonEmptyChunk, Promise, UIO, ZIO } -import scala.annotation.nowarn -import FilePlatformSpecific._ -import zio.{Chunk, Exit, NonEmptyChunk, Ref, UIO, Unsafe, ZIO} - +import java.io.OutputStream import java.lang.ProcessBuilder.Redirect +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ -import java.io.OutputStream -import zio.stream.ZSink private[process] trait CommandPlatformSpecific { @@ -36,55 +35,56 @@ private[process] trait CommandPlatformSpecific { @nowarn protected def build(c: Command.Standard, piping: Option[java.io.InputStream]): ZIO[Any, Throwable, Process] = - ZIO.asyncInterrupt { cb => - val ref: Ref.Atomic[JProcess] = Ref.unsafe.make[JProcess](null)(Unsafe.unsafe) - - def unsafeRunCmd: Process = { - val builder = new ProcessBuilder(adaptCommand(c.command): _*) - builder.redirectErrorStream(c.redirectErrorStream) - c.workingDirectory.foreach { dir => - if (!checkDirectory(dir)) throw CommandError.WorkingDirectoryMissing(dir) - builder.directory(dir) + ZIO.fiberIdWith { fiberId => + ZIO.asyncInterruptUnsafe { implicit unsafe => cb => + val processRef: Promise[Nothing, JProcess] = Promise.unsafe.make[Nothing, JProcess](fiberId) + + def unsafeRunCmd: Process = { + val builder = new ProcessBuilder(adaptCommand(c.command): _*) + builder.redirectErrorStream(c.redirectErrorStream) + c.workingDirectory.foreach { dir => + if (!checkDirectory(dir)) throw CommandError.WorkingDirectoryMissing(dir) + builder.directory(dir) + } + + if (c.env.nonEmpty) { + builder.environment().putAll(c.env.asJava) + } + + c.stdin match { + case ProcessInput.Inherit => builder.redirectInput(Redirect.INHERIT) + case ProcessInput.Pipe => builder.redirectInput(Redirect.PIPE) + case _ => () + } + + c.stdout match { + case ProcessOutput.FileRedirect(file) => builder.redirectOutput(Redirect.to(file)) + case ProcessOutput.FileAppendRedirect(file) => builder.redirectOutput(Redirect.appendTo(file)) + case ProcessOutput.Inherit => builder.redirectOutput(Redirect.INHERIT) + case ProcessOutput.Pipe => builder.redirectOutput(Redirect.PIPE) + } + + c.stderr match { + case ProcessOutput.FileRedirect(file) => builder.redirectError(Redirect.to(file)) + case ProcessOutput.FileAppendRedirect(file) => builder.redirectError(Redirect.appendTo(file)) + case ProcessOutput.Inherit => builder.redirectError(Redirect.INHERIT) + case ProcessOutput.Pipe => builder.redirectError(Redirect.PIPE) + } + + val jProcess = builder.start() + processRef.unsafe.succeed(jProcess) + Process(jProcess) } - if (c.env.nonEmpty) { - builder.environment().putAll(c.env.asJava) - } + val canceler: UIO[Unit] = + processRef.await.flatMap { process => + ZIO.attempt(process.destroy()).ignoreLogged + } - c.stdin match { - case ProcessInput.Inherit => builder.redirectInput(Redirect.INHERIT) - case ProcessInput.Pipe => builder.redirectInput(Redirect.PIPE) - case _ => () - } + cb(ZIO.attempt(unsafeRunCmd)) - c.stdout match { - case ProcessOutput.FileRedirect(file) => builder.redirectOutput(Redirect.to(file)) - case ProcessOutput.FileAppendRedirect(file) => builder.redirectOutput(Redirect.appendTo(file)) - case ProcessOutput.Inherit => builder.redirectOutput(Redirect.INHERIT) - case ProcessOutput.Pipe => builder.redirectOutput(Redirect.PIPE) - } - - c.stderr match { - case ProcessOutput.FileRedirect(file) => builder.redirectError(Redirect.to(file)) - case ProcessOutput.FileAppendRedirect(file) => builder.redirectError(Redirect.appendTo(file)) - case ProcessOutput.Inherit => builder.redirectError(Redirect.INHERIT) - case ProcessOutput.Pipe => builder.redirectError(Redirect.PIPE) - } - - val jProcess = builder.start() - ref.unsafe.set(jProcess)(Unsafe.unsafe) - Process(jProcess) + Left(canceler) } - - val canceler: UIO[Unit] = - ref.get.flatMap { - case null => Exit.unit - case process => ZIO.attempt(process.destroy()).ignoreLogged - } - - cb(ZIO.attempt(unsafeRunCmd)) - - Left(canceler) } protected def connectStdin(process: Process, stdin: ProcessInput): ZIO[Any, CommandError, Unit] =