From e777c146636ff9ef51fbb476f4c079d6682a6e18 Mon Sep 17 00:00:00 2001 From: Nick Nassiri Date: Sat, 4 Jul 2026 18:55:12 -0700 Subject: [PATCH] fix(worker_threads): gate compiled $MessagePort.Start() Ref by cross-thread status (#1254) The emitted $MessagePort.Start() unconditionally Ref'd the event loop, so a started, unclosed in-process MessageChannel port hung a compiled program forever. SharpTSMessagePort only Refs for a cross-thread (worker-transferred) port, so this was a compiled-vs-interpreter divergence. Mirrors the interpreter's _crossThread/MarkTransferredAcrossThreads split: $MessagePort gains a _crossThread field and a MarkTransferredAcrossThreads() method (recurses to the partner), and Start() only Refs when set. CompiledMessagePortBridge.Adopt now calls it when a compiled port is transferred to a worker, so both ends still Ref while listening (#406). --- Compilation/RuntimeEmitter.MessageChannel.cs | 81 ++++++++++++++++++- Runtime/Types/CompiledMessagePortBridge.cs | 10 +++ .../SharedTests/WorkerThreadsTests.cs | 20 +++++ 3 files changed, 107 insertions(+), 4 deletions(-) diff --git a/Compilation/RuntimeEmitter.MessageChannel.cs b/Compilation/RuntimeEmitter.MessageChannel.cs index 9475ccf0..49d17381 100644 --- a/Compilation/RuntimeEmitter.MessageChannel.cs +++ b/Compilation/RuntimeEmitter.MessageChannel.cs @@ -31,6 +31,11 @@ public partial class RuntimeEmitter private FieldBuilder _messagePortStartedField = null!; private FieldBuilder _messagePortClosedField = null!; private FieldBuilder _messagePortRefedField = null!; + // Set (recursively, onto the partner too) when this port or its partner has been + // transferred to a worker (#1254). Only a started CROSS-THREAD port Refs the owner + // loop — mirrors SharpTSMessagePort._crossThread — so a plain in-process port with a + // drained queue doesn't keep a compiled program running forever. + private FieldBuilder _messagePortCrossThreadField = null!; private FieldBuilder _messagePortOnEnqueueField = null!; // Shared sentinel enqueued (in place of a clone) when postMessage receives an // uncloneable value. Drain turns it into a 'messageerror' event, mirroring the @@ -65,6 +70,7 @@ private void EmitMessagePortClass(ModuleBuilder moduleBuilder, EmittedRuntime ru _messagePortStartedField = typeBuilder.DefineField("_started", _types.Boolean, FieldAttributes.Assembly); _messagePortClosedField = typeBuilder.DefineField("_closed", _types.Boolean, FieldAttributes.Assembly); _messagePortRefedField = typeBuilder.DefineField("_refed", _types.Boolean, FieldAttributes.Assembly); + _messagePortCrossThreadField = typeBuilder.DefineField("_crossThread", _types.Boolean, FieldAttributes.Assembly); // Optional on-enqueue notification. Null for ordinary in-process ports; set // (reflectively) by CompiledMessagePortBridge when this port has been // transferred to an interpreter worker, so a parent post wakes the worker loop @@ -87,6 +93,7 @@ private void EmitMessagePortClass(ModuleBuilder moduleBuilder, EmittedRuntime ru EmitMessagePortDrain(typeBuilder, runtime); EmitMessagePortRef(typeBuilder, runtime); EmitMessagePortUnref(typeBuilder, runtime); + EmitMessagePortMarkTransferredAcrossThreads(typeBuilder, runtime); EmitMessagePortStart(typeBuilder, runtime); EmitMessagePortPostMessage(typeBuilder, runtime); EmitMessagePortClose(typeBuilder, runtime); @@ -238,9 +245,70 @@ private void EmitMessagePortUnref(TypeBuilder typeBuilder, EmittedRuntime runtim } /// - /// Start() — begins message delivery: Refs the event loop (a started, - /// unclosed port keeps the process alive) and schedules a Drain for - /// anything queued before the port started. + /// MarkTransferredAcrossThreads() — flags this port (and, recursively, its + /// partner) as cross-thread once either end has been transferred to a worker. + /// Mirrors SharpTSMessagePort.MarkTransferredAcrossThreads (#406): a plain + /// in-process channel never sets this, so its started ports don't Ref the loop + /// (#1254); once one end is handed to a worker, BOTH ends must Ref while + /// listening — the worker end via 's own + /// keep-alive, and this (compiled) end via Start()'s gated Ref(). + /// If this port is already started when the transfer is recorded, Ref it now + /// since Start() won't run again. + /// + private void EmitMessagePortMarkTransferredAcrossThreads(TypeBuilder typeBuilder, EmittedRuntime runtime) + { + var method = typeBuilder.DefineMethod("MarkTransferredAcrossThreads", MethodAttributes.Public, _types.Void, Type.EmptyTypes); + + var il = method.GetILGenerator(); + var exitLabel = il.DefineLabel(); + var skipRefLabel = il.DefineLabel(); + var noPartnerLabel = il.DefineLabel(); + + // if (_crossThread) return; + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldfld, _messagePortCrossThreadField); + il.Emit(OpCodes.Brtrue, exitLabel); + + // _crossThread = true; + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldc_I4_1); + il.Emit(OpCodes.Stfld, _messagePortCrossThreadField); + + // if (_started && !_closed && !_refed) this.Ref(); + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldfld, _messagePortStartedField); + il.Emit(OpCodes.Brfalse, skipRefLabel); + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldfld, _messagePortClosedField); + il.Emit(OpCodes.Brtrue, skipRefLabel); + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldfld, _messagePortRefedField); + il.Emit(OpCodes.Brtrue, skipRefLabel); + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Call, _messagePortRef); + il.MarkLabel(skipRefLabel); + + // var partner = _partner as $MessagePort; partner?.MarkTransferredAcrossThreads(); + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldfld, _messagePortPartnerField); + il.Emit(OpCodes.Isinst, typeBuilder); + il.Emit(OpCodes.Dup); + il.Emit(OpCodes.Brfalse, noPartnerLabel); + il.Emit(OpCodes.Call, method); + il.Emit(OpCodes.Br, exitLabel); + il.MarkLabel(noPartnerLabel); + il.Emit(OpCodes.Pop); + + il.MarkLabel(exitLabel); + il.Emit(OpCodes.Ret); + } + + /// + /// Start() — begins message delivery: Refs the event loop when this port is + /// cross-thread (a started, unclosed cross-thread port keeps the process alive, + /// #406) and schedules a Drain for anything queued before the port started. A + /// plain in-process port (never transferred, #1254) does not Ref — its queue + /// drains synchronously and shouldn't keep the process running forever. /// private void EmitMessagePortStart(TypeBuilder typeBuilder, EmittedRuntime runtime) { @@ -249,6 +317,7 @@ private void EmitMessagePortStart(TypeBuilder typeBuilder, EmittedRuntime runtim var il = method.GetILGenerator(); var exitLabel = il.DefineLabel(); + var skipRefLabel = il.DefineLabel(); // if (_started || _closed) return il.Emit(OpCodes.Ldarg_0); @@ -263,9 +332,13 @@ private void EmitMessagePortStart(TypeBuilder typeBuilder, EmittedRuntime runtim il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Stfld, _messagePortStartedField); - // this.Ref() + // if (_crossThread) this.Ref(); + il.Emit(OpCodes.Ldarg_0); + il.Emit(OpCodes.Ldfld, _messagePortCrossThreadField); + il.Emit(OpCodes.Brfalse, skipRefLabel); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Call, _messagePortRef); + il.MarkLabel(skipRefLabel); // $EventLoop.GetInstance().Schedule(new Action(this.Drain)) il.Emit(OpCodes.Call, runtime.EventLoopGetInstance); diff --git a/Runtime/Types/CompiledMessagePortBridge.cs b/Runtime/Types/CompiledMessagePortBridge.cs index bed007f9..4a6eb16f 100644 --- a/Runtime/Types/CompiledMessagePortBridge.cs +++ b/Runtime/Types/CompiledMessagePortBridge.cs @@ -104,6 +104,16 @@ public static CompiledMessagePortBridge Adopt(object compiledPort) ?? throw new StructuredClone.DataCloneError( "Cannot transfer $MessagePort: no _onEnqueue field (emitted shape changed?)"); + var markTransferred = type.GetMethod("MarkTransferredAcrossThreads", Type.EmptyTypes) + ?? throw new StructuredClone.DataCloneError( + "Cannot transfer $MessagePort: no MarkTransferredAcrossThreads method (emitted shape changed?)"); + // Flags this port AND its (compiled, still parent-side) partner cross-thread, so a + // parent-side Start() on the partner Refs the parent loop while it awaits a reply + // through this now-bridged port (#406) — a plain, never-transferred in-process port + // must NOT Ref (#1254). This bridge manages its own worker-loop keep-alive separately + // in Start() below, independent of the compiled port's _refed state. + markTransferred.Invoke(compiledPort, null); + return new CompiledMessagePortBridge(compiledPort, postMessage, incoming, onEnqueueField); } diff --git a/SharpTS.Tests/SharedTests/WorkerThreadsTests.cs b/SharpTS.Tests/SharedTests/WorkerThreadsTests.cs index a8b8b990..b733b958 100644 --- a/SharpTS.Tests/SharedTests/WorkerThreadsTests.cs +++ b/SharpTS.Tests/SharedTests/WorkerThreadsTests.cs @@ -338,6 +338,26 @@ public void MessageChannel_MessagesPostedBeforeListener_DeliveredInOrderAfterImp Assert.Contains("got: first\ngot: second", output); } + [Theory] + [MemberData(nameof(ExecutionModes.All), MemberType = typeof(ExecutionModes))] + public void MessageChannel_UnclosedPort_DoesNotHangProcess(ExecutionMode mode) + { + // #1254: a plain in-process MessageChannel (neither port ever transferred to a + // worker) must not keep the event loop alive once its queue drains — matching + // SharpTSMessagePort, which only Refs a started CROSS-THREAD port (#406). Before + // the fix, the compiled $MessagePort.Start() unconditionally Ref'd the loop, so + // this program hung forever in compiled mode despite the port never closing. + var source = @" + let channel: any = new MessageChannel(); + channel.port2.on('message', (value: any) => { + console.log('got: ' + value); + }); + channel.port1.postMessage('hi'); + "; + var output = TestHarness.Run(source, mode); + Assert.Equal("got: hi\n", output); + } + #endregion #region StructuredClone Tests