Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions Compilation/RuntimeEmitter.MessageChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public partial class RuntimeEmitter
private FieldBuilder _messagePortStartedField = null!;
private FieldBuilder _messagePortClosedField = null!;
private FieldBuilder _messagePortRefedField = null!;
// Set (recursively, onto the partner too) when this port or its partner has been
// transferred to a worker (#1254). Only a started CROSS-THREAD port Refs the owner
// loop — mirrors SharpTSMessagePort._crossThread — so a plain in-process port with a
// drained queue doesn't keep a compiled program running forever.
private FieldBuilder _messagePortCrossThreadField = null!;
private FieldBuilder _messagePortOnEnqueueField = null!;
// Shared sentinel enqueued (in place of a clone) when postMessage receives an
// uncloneable value. Drain turns it into a 'messageerror' event, mirroring the
Expand Down Expand Up @@ -65,6 +70,7 @@ private void EmitMessagePortClass(ModuleBuilder moduleBuilder, EmittedRuntime ru
_messagePortStartedField = typeBuilder.DefineField("_started", _types.Boolean, FieldAttributes.Assembly);
_messagePortClosedField = typeBuilder.DefineField("_closed", _types.Boolean, FieldAttributes.Assembly);
_messagePortRefedField = typeBuilder.DefineField("_refed", _types.Boolean, FieldAttributes.Assembly);
_messagePortCrossThreadField = typeBuilder.DefineField("_crossThread", _types.Boolean, FieldAttributes.Assembly);
// Optional on-enqueue notification. Null for ordinary in-process ports; set
// (reflectively) by CompiledMessagePortBridge when this port has been
// transferred to an interpreter worker, so a parent post wakes the worker loop
Expand All @@ -87,6 +93,7 @@ private void EmitMessagePortClass(ModuleBuilder moduleBuilder, EmittedRuntime ru
EmitMessagePortDrain(typeBuilder, runtime);
EmitMessagePortRef(typeBuilder, runtime);
EmitMessagePortUnref(typeBuilder, runtime);
EmitMessagePortMarkTransferredAcrossThreads(typeBuilder, runtime);
EmitMessagePortStart(typeBuilder, runtime);
EmitMessagePortPostMessage(typeBuilder, runtime);
EmitMessagePortClose(typeBuilder, runtime);
Expand Down Expand Up @@ -238,9 +245,70 @@ private void EmitMessagePortUnref(TypeBuilder typeBuilder, EmittedRuntime runtim
}

/// <summary>
/// Start() — begins message delivery: Refs the event loop (a started,
/// unclosed port keeps the process alive) and schedules a Drain for
/// anything queued before the port started.
/// MarkTransferredAcrossThreads() — flags this port (and, recursively, its
/// partner) as cross-thread once either end has been transferred to a worker.
/// Mirrors <c>SharpTSMessagePort.MarkTransferredAcrossThreads</c> (#406): a plain
/// in-process channel never sets this, so its started ports don't Ref the loop
/// (#1254); once one end is handed to a worker, BOTH ends must Ref while
/// listening — the worker end via <see cref="CompiledMessagePortBridge"/>'s own
/// keep-alive, and this (compiled) end via <c>Start()</c>'s gated <c>Ref()</c>.
/// If this port is already started when the transfer is recorded, Ref it now
/// since <c>Start()</c> won't run again.
/// </summary>
private void EmitMessagePortMarkTransferredAcrossThreads(TypeBuilder typeBuilder, EmittedRuntime runtime)
{
var method = typeBuilder.DefineMethod("MarkTransferredAcrossThreads", MethodAttributes.Public, _types.Void, Type.EmptyTypes);

var il = method.GetILGenerator();
var exitLabel = il.DefineLabel();
var skipRefLabel = il.DefineLabel();
var noPartnerLabel = il.DefineLabel();

// if (_crossThread) return;
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, _messagePortCrossThreadField);
il.Emit(OpCodes.Brtrue, exitLabel);

// _crossThread = true;
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldc_I4_1);
il.Emit(OpCodes.Stfld, _messagePortCrossThreadField);

// if (_started && !_closed && !_refed) this.Ref();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, _messagePortStartedField);
il.Emit(OpCodes.Brfalse, skipRefLabel);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, _messagePortClosedField);
il.Emit(OpCodes.Brtrue, skipRefLabel);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, _messagePortRefedField);
il.Emit(OpCodes.Brtrue, skipRefLabel);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call, _messagePortRef);
il.MarkLabel(skipRefLabel);

// var partner = _partner as $MessagePort; partner?.MarkTransferredAcrossThreads();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, _messagePortPartnerField);
il.Emit(OpCodes.Isinst, typeBuilder);
il.Emit(OpCodes.Dup);
il.Emit(OpCodes.Brfalse, noPartnerLabel);
il.Emit(OpCodes.Call, method);
il.Emit(OpCodes.Br, exitLabel);
il.MarkLabel(noPartnerLabel);
il.Emit(OpCodes.Pop);

il.MarkLabel(exitLabel);
il.Emit(OpCodes.Ret);
}

/// <summary>
/// Start() — begins message delivery: Refs the event loop when this port is
/// cross-thread (a started, unclosed cross-thread port keeps the process alive,
/// #406) and schedules a Drain for anything queued before the port started. A
/// plain in-process port (never transferred, #1254) does not Ref — its queue
/// drains synchronously and shouldn't keep the process running forever.
/// </summary>
private void EmitMessagePortStart(TypeBuilder typeBuilder, EmittedRuntime runtime)
{
Expand All @@ -249,6 +317,7 @@ private void EmitMessagePortStart(TypeBuilder typeBuilder, EmittedRuntime runtim

var il = method.GetILGenerator();
var exitLabel = il.DefineLabel();
var skipRefLabel = il.DefineLabel();

// if (_started || _closed) return
il.Emit(OpCodes.Ldarg_0);
Expand All @@ -263,9 +332,13 @@ private void EmitMessagePortStart(TypeBuilder typeBuilder, EmittedRuntime runtim
il.Emit(OpCodes.Ldc_I4_1);
il.Emit(OpCodes.Stfld, _messagePortStartedField);

// this.Ref()
// if (_crossThread) this.Ref();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, _messagePortCrossThreadField);
il.Emit(OpCodes.Brfalse, skipRefLabel);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call, _messagePortRef);
il.MarkLabel(skipRefLabel);

// $EventLoop.GetInstance().Schedule(new Action(this.Drain))
il.Emit(OpCodes.Call, runtime.EventLoopGetInstance);
Expand Down
10 changes: 10 additions & 0 deletions Runtime/Types/CompiledMessagePortBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ public static CompiledMessagePortBridge Adopt(object compiledPort)
?? throw new StructuredClone.DataCloneError(
"Cannot transfer $MessagePort: no _onEnqueue field (emitted shape changed?)");

var markTransferred = type.GetMethod("MarkTransferredAcrossThreads", Type.EmptyTypes)
?? throw new StructuredClone.DataCloneError(
"Cannot transfer $MessagePort: no MarkTransferredAcrossThreads method (emitted shape changed?)");
// Flags this port AND its (compiled, still parent-side) partner cross-thread, so a
// parent-side Start() on the partner Refs the parent loop while it awaits a reply
// through this now-bridged port (#406) — a plain, never-transferred in-process port
// must NOT Ref (#1254). This bridge manages its own worker-loop keep-alive separately
// in Start() below, independent of the compiled port's _refed state.
markTransferred.Invoke(compiledPort, null);

return new CompiledMessagePortBridge(compiledPort, postMessage, incoming, onEnqueueField);
}

Expand Down
20 changes: 20 additions & 0 deletions SharpTS.Tests/SharedTests/WorkerThreadsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,26 @@ public void MessageChannel_MessagesPostedBeforeListener_DeliveredInOrderAfterImp
Assert.Contains("got: first\ngot: second", output);
}

[Theory]
[MemberData(nameof(ExecutionModes.All), MemberType = typeof(ExecutionModes))]
public void MessageChannel_UnclosedPort_DoesNotHangProcess(ExecutionMode mode)
{
// #1254: a plain in-process MessageChannel (neither port ever transferred to a
// worker) must not keep the event loop alive once its queue drains — matching
// SharpTSMessagePort, which only Refs a started CROSS-THREAD port (#406). Before
// the fix, the compiled $MessagePort.Start() unconditionally Ref'd the loop, so
// this program hung forever in compiled mode despite the port never closing.
var source = @"
let channel: any = new MessageChannel();
channel.port2.on('message', (value: any) => {
console.log('got: ' + value);
});
channel.port1.postMessage('hi');
";
var output = TestHarness.Run(source, mode);
Assert.Equal("got: hi\n", output);
}

#endregion

#region StructuredClone Tests
Expand Down
Loading