Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Runtime/BuiltIns/ProcessBuiltIns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ public static partial class ProcessBuiltIns
"argv" => GetArgv(),
"exitCode" => (double)Environment.ExitCode,

// Stream objects
"stdin" => SharpTSStdin.Instance,
// Stream objects. Inside a worker thread, process.stdin resolves to that worker's
// isolated per-worker Readable (#1076) rather than the Console-reading singleton, so a
// worker never consumes the host terminal; the override is null on the main thread.
"stdin" => (object?)WorkerThreads.WorkerStdin ?? SharpTSStdin.Instance,
"stdout" => SharpTSStdout.Instance,
"stderr" => SharpTSStderr.Instance,

Expand Down
121 changes: 115 additions & 6 deletions Runtime/Types/SharpTSWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ public class SharpTSWorker : SharpTSEventEmitter, IDisposable
private readonly SharpTSReadable? _stderr;
private readonly object? _resourceLimits;

// Per-worker stdin (#1076): the parent→worker mirror of the #1003 stdout path.
// `_workerStdin` is the worker's own process.stdin Readable — always allocated and
// installed as a [ThreadStatic] override on the worker thread (RunWorkerScript) so a
// worker never falls through to the real Console-reading SharpTSStdin singleton. `_stdin`
// is the parent-facing Writable exposed as worker.stdin, created only when `stdin: true`;
// each write is marshaled across the thread boundary via `_parentToWorkerStdinQueue` and
// pushed into `_workerStdin` by the message poller (PumpStdin). A null queue item is the
// EOF sentinel (PushFromHost(null) => 'end'), enqueued by _stdin.end()'s final callback.
private readonly SharpTSReadable _workerStdin = new();
private readonly SharpTSWritable? _stdin;
private readonly BlockingCollection<object?> _parentToWorkerStdinQueue = new();

// Wall-clock start, used for the best-effort worker.performance.eventLoopUtilization()
// (#1004) — SharpTS has no precise idle/active loop accounting.
private readonly long _startTick = Environment.TickCount64;
Expand Down Expand Up @@ -154,7 +166,8 @@ public SharpTSWorker(string filename, object? options, Interpreter? parentInterp
// is marshalled onto the parent loop before delivery (the streams are only touched on
// the parent's thread). resourceLimits is stored and echoed on worker.resourceLimits —
// .NET exposes no per-thread V8 heap/stack sizing, so it cannot be enforced (#407 ceiling).
// stdin remains unsupported (no parent→worker process.stdin bridge yet).
// stdin: true (#1076) exposes worker.stdin as a Writable whose chunks are marshalled onto
// the worker loop and fed into the worker's process.stdin Readable (the reverse direction).
//
// The bag is a SharpTSObject in interpreter mode and a Dictionary<string, object?>
// (a compiled object literal) in compiled mode; ReadOption reads through both so
Expand All @@ -172,6 +185,8 @@ public SharpTSWorker(string filename, object? options, Interpreter? parentInterp
_stdout = new SharpTSReadable();
if (ReadOption(options, "stderr") is true)
_stderr = new SharpTSReadable();
if (ReadOption(options, "stdin") is true)
_stdin = CreateStdinWritable();
}

// Clone workerData for transfer to worker
Expand Down Expand Up @@ -319,6 +334,12 @@ private void WorkerThreadMain()
_parentToWorkerQueue.CompleteAdding();
_workerToParentQueue.CompleteAdding();

// Stop accepting parent→worker stdin: a late worker.stdin.write() after exit becomes a
// guarded no-op (#1076). Clear the thread-local stdin override for hygiene — the worker
// thread is about to die, but this keeps the [ThreadStatic] from lingering if reused.
_parentToWorkerStdinQueue.CompleteAdding();
WorkerThreads.WorkerStdin = null;

// Signal end-of-stream on the diverted stdio Readables (#1003). Scheduled after the
// data pushes (FIFO on the parent loop) so 'end' follows the last 'data'.
if (_stdout != null)
Expand Down Expand Up @@ -363,6 +384,12 @@ private void RunWorkerScript()
_workerInterpreter = interpreter;
interpreter.SetWorkerTerminationToken(_cts.Token);

// Isolate this worker's process.stdin (#1076): install the per-worker Readable as the
// thread-local override so guest `process.stdin` never resolves to the Console-reading
// singleton. Always done (even without `stdin: true`) so a worker never consumes the host
// terminal; it is only *fed* (via PumpStdin) when the parent passed `stdin: true`.
WorkerThreads.WorkerStdin = _workerStdin;

// Set up worker globals
SetupWorkerGlobals(interpreter);

Expand Down Expand Up @@ -854,13 +881,13 @@ public void Unref()
{
"threadId" => ThreadId,

// #1003: per-worker stdio + resourceLimits. stdout/stderr are Readable streams when
// `stdout`/`stderr: true` was passed (else null, matching the no-pipe default).
// resourceLimits echoes the passed object (stored, not enforced). stdin is not
// supported (no parent→worker process.stdin bridge) — null.
// #1003/#1076: per-worker stdio + resourceLimits. stdout/stderr are Readable streams
// when `stdout`/`stderr: true` was passed; stdin is a Writable when `stdin: true` was
// passed (else null, matching Node's no-pipe default). resourceLimits echoes the passed
// object (stored, not enforced).
"stdout" => _stdout is not null ? (object?)_stdout : null,
"stderr" => _stderr is not null ? (object?)_stderr : null,
"stdin" => null,
"stdin" => _stdin is not null ? (object?)_stdin : null,
"resourceLimits" => _resourceLimits ?? new SharpTSObject(new Dictionary<string, object?>()),

// #1004 introspection. getHeapSnapshot has no .NET equivalent (V8 snapshot format) —
Expand Down Expand Up @@ -910,6 +937,72 @@ public void Unref()
};
}

/// <summary>
/// Builds the parent-facing worker.stdin Writable (#1076). Each write enqueues its chunk,
/// and end() enqueues a null EOF sentinel, onto <see cref="_parentToWorkerStdinQueue"/>; the
/// message poller drains them into the worker's process.stdin via <see cref="PumpStdin"/>.
/// Mirrors the child_process stdin bridge (ChildProcessModuleInterpreter) — chunks cross the
/// thread boundary as data only, so guest listeners inside the worker run on the worker.
/// </summary>
private SharpTSWritable CreateStdinWritable()
{
var writable = new SharpTSWritable();

// write(chunk[, encoding][, cb]): queue the chunk for the worker, then signal completion
// so the Writable's backpressure/drain bookkeeping advances (mirror of child_process).
writable.SetWriteCallback(BuiltInMethod.CreateV2("write", 1, 3, (interp, _, wargs) =>
{
if (wargs.Length > 0 && !wargs[0].IsNull)
TryEnqueueStdin(wargs[0].ToObject());
if (wargs.Length > 2 && wargs[2].ToObject() is ISharpTSCallable cb)
cb.Call(interp, [null]);
else if (wargs.Length > 1 && wargs[1].ToObject() is ISharpTSCallable cb2)
cb2.Call(interp, [null]);
return RuntimeValue.True;
}));

// end()/final: push a null EOF sentinel so the worker's process.stdin emits 'end'.
writable.SetFinalCallback(BuiltInMethod.CreateV2("final", 0, 1, (interp, _, fargs) =>
{
TryEnqueueStdin(null); // null => EOF sentinel
if (fargs.Length > 0 && fargs[0].ToObject() is ISharpTSCallable done)
done.Call(interp, [null]);
return RuntimeValue.Undefined;
}));

return writable;
}

/// <summary>
/// Enqueues one parent→worker stdin item (chunk, or null EOF), swallowing the race where the
/// worker has already exited and completed the queue (mirror of PostMessage's guard).
/// </summary>
private void TryEnqueueStdin(object? item)
{
if (_isTerminated)
return;
try
{
_parentToWorkerStdinQueue.Add(item);
}
catch (InvalidOperationException)
{
// Queue completed — worker is terminating/exited.
}
}

/// <summary>
/// Drains queued parent→worker stdin chunks into the worker's process.stdin Readable.
/// Called on each poll tick (worker side) with the worker interpreter, so the pushes — and
/// the 'data'/'end' events they trigger — run against a live interpreter after guest code has
/// attached its listeners (same marshaling model as message delivery). A null item is EOF.
/// </summary>
internal void PumpStdin(Interpreter workerInterpreter)
{
while (_parentToWorkerStdinQueue.TryTake(out var chunk))
_workerStdin.PushFromHost(workerInterpreter, chunk);
}

/// <summary>
/// Checks for pending messages from parent (called by worker thread).
/// </summary>
Expand Down Expand Up @@ -938,6 +1031,7 @@ public void Dispose()
_cts.Dispose();
_parentToWorkerQueue.Dispose();
_workerToParentQueue.Dispose();
_parentToWorkerStdinQueue.Dispose();
GC.SuppressFinalize(this);
}

Expand Down Expand Up @@ -1019,6 +1113,11 @@ private void PollMessages(object? state)
if (_worker.IsTerminated)
return;

// Feed any parent→worker stdin chunks into this worker's process.stdin (#1076). Done on
// the same poll tick as message delivery so 'data'/'end' events run against the live worker
// interpreter after guest code has attached its listeners.
_worker.PumpStdin(_interpreter);

while (_worker.TryReceiveMessage(out var message))
{
if (message == null)
Expand Down Expand Up @@ -1060,6 +1159,16 @@ private void PollMessages(object? state)
/// </summary>
public static class WorkerThreads
{
/// <summary>
/// The current thread's per-worker process.stdin Readable (#1076). Set on a worker's
/// dedicated thread in <see cref="SharpTSWorker.RunWorkerScript"/> so that guest access to
/// <c>process.stdin</c> inside the worker resolves here (via ProcessBuiltIns) rather than to
/// the process-global, Console-reading <see cref="SharpTSStdin"/> singleton. Null on the main
/// thread and on pool threads → those keep the singleton. [ThreadStatic] mirrors ClusterContext.
/// </summary>
[ThreadStatic]
public static SharpTSReadable? WorkerStdin;

/// <summary>
/// Gets whether the current thread is the main thread.
/// </summary>
Expand Down
99 changes: 99 additions & 0 deletions SharpTS.Tests/SharedTests/WorkerThreadsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,105 @@ public void Worker_StdoutTrue_CapturesWorkerConsoleOutput(ExecutionMode mode)

#endregion

#region worker.stdin (#1076)

/// <summary>
/// #1076: with <c>stdin: true</c>, <c>worker.stdin</c> is a Writable on the parent whose
/// writes are bridged into the worker's <c>process.stdin</c> Readable (parent→worker, the
/// reverse of the #1003 stdout path). The worker reads the chunk via a 'data' listener and
/// <c>worker.stdin.end()</c> surfaces as 'end'. The parent writes only after the worker signals
/// it has attached its listeners, so the assertion doesn't race the listener setup. Dual-mode:
/// the worker always interprets; a compiled parent reaches <c>worker.stdin</c> via runtime
/// dispatch on the C# SharpTSWorker (like worker.stdout).
/// </summary>
[Theory]
[MemberData(nameof(ExecutionModes.All), MemberType = typeof(ExecutionModes))]
public void Worker_StdinTrue_ParentWriteReadableInWorker(ExecutionMode mode)
{
var files = new Dictionary<string, string>
{
["worker_in.ts"] = """
(process as any).stdin.on("data", (chunk: any) => { postMessage("GOT[" + ("" + chunk).trim() + "]"); });
(process as any).stdin.on("end", () => { postMessage("END"); });
postMessage("ready");
setTimeout(() => {}, 2000); // stay alive to receive stdin
""",
["main.ts"] = """
import { Worker } from "worker_threads";
const w: any = new Worker(__dirname + "/worker_in.ts", { stdin: true });
w.on("message", (e: any) => {
if (e.data === "ready") { w.stdin.write("ping"); w.stdin.end(); }
else { console.log(e.data); }
});
"""
};

var output = TestHarness.RunModules(files, "main.ts", mode);
Assert.Contains("GOT[ping]", output);
Assert.Contains("END", output);
}

/// <summary>
/// #1076: multiple parent writes arrive at the worker's <c>process.stdin</c> in order. The
/// worker accumulates each chunk and, on 'end', posts the concatenation — verifying both
/// ordering and that end() flushes after the last chunk. Dual-mode.
/// </summary>
[Theory]
[MemberData(nameof(ExecutionModes.All), MemberType = typeof(ExecutionModes))]
public void Worker_StdinTrue_MultipleWritesPreserveOrder(ExecutionMode mode)
{
var files = new Dictionary<string, string>
{
["worker_acc.ts"] = """
let acc = "";
(process as any).stdin.on("data", (chunk: any) => { acc += ("" + chunk); });
(process as any).stdin.on("end", () => { postMessage("ACC[" + acc + "]"); });
postMessage("ready");
setTimeout(() => {}, 2000);
""",
["main.ts"] = """
import { Worker } from "worker_threads";
const w: any = new Worker(__dirname + "/worker_acc.ts", { stdin: true });
w.on("message", (e: any) => {
if (e.data === "ready") { w.stdin.write("a"); w.stdin.write("b"); w.stdin.write("c"); w.stdin.end(); }
else { console.log(e.data); }
});
"""
};

var output = TestHarness.RunModules(files, "main.ts", mode);
Assert.Contains("ACC[abc]", output);
}

/// <summary>
/// #1076: without <c>stdin: true</c>, <c>worker.stdin</c> is not a pipe — the parent must opt
/// in, exactly like stdout/stderr. SharpTS surfaces the absent stream as <c>undefined</c>
/// (its GetMember convention for stdout/stderr too), which is falsy like Node's <c>null</c>,
/// so <c>!w.stdin</c> holds. Dual-mode.
/// </summary>
[Theory]
[MemberData(nameof(ExecutionModes.All), MemberType = typeof(ExecutionModes))]
public void Worker_StdinWithoutOption_IsAbsent(ExecutionMode mode)
{
var files = new Dictionary<string, string>
{
["worker_noop.ts"] = """
postMessage("hi");
""",
["main.ts"] = """
import { Worker } from "worker_threads";
const w: any = new Worker(__dirname + "/worker_noop.ts");
console.log("stdin-absent:" + (!w.stdin));
w.on("message", () => {});
"""
};

var output = TestHarness.RunModules(files, "main.ts", mode);
Assert.Contains("stdin-absent:true", output);
}

#endregion

#region markAsUntransferable (#1002)

/// <summary>
Expand Down
Loading