Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions apps/cli/lib/daemon-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,13 @@ const daemonListProcessesSuccessResponseSchema = z.object( {

// Cache the process list returned from the process manager for a very short time to make multiple
// calls in quick succession more efficient
const listProcesses = cacheFunctionTTL( async () => {
async function listProcesses() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the cacheFunctionTTL is not crucial to this PR's purpose. I just started viewing it as a liability while working on the stopWordPressServer logic. isProcessRunning depends on listProcesses, which caches its result for 1 second. That could easily lead to a sneaky bug at some point, and communicating with the process manager daemon over a socket is pretty fast to begin with.

await connectToDaemon();
const response = await sendDaemonRequest( {
type: 'list-processes',
} );
return daemonListProcessesSuccessResponseSchema.parse( response ).processes;
} );
}

export async function getDaemonBus(): Promise< DaemonBus > {
if ( ! daemonBus ) {
Expand Down Expand Up @@ -289,10 +290,6 @@ export async function isProcessRunning(
processName: string
): Promise< ProcessDescription | undefined > {
try {
if ( ! isConnected ) {
return undefined;
}

const processes = await listProcesses();
return processes.find( ( p ) => p.name === processName && p.status === 'online' );
} catch ( error ) {
Expand Down Expand Up @@ -322,6 +319,12 @@ export async function startProcess(
}

export async function stopProcess( processName: string ): Promise< void > {
const runningProcess = await isProcessRunning( processName );

if ( ! runningProcess ) {
return;
}

await connectToDaemon();
await sendDaemonRequest( {
type: 'stop-process',
Expand Down
48 changes: 43 additions & 5 deletions apps/cli/lib/tests/wordpress-server-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ describe( 'WordPress Server Manager', () => {
vi.mocked( daemonClient.startProcess ).mockResolvedValue( mockProcessDescription );
vi.mocked( daemonClient.stopProcess ).mockResolvedValue( undefined );
vi.mocked( daemonClient.getDaemonBus ).mockResolvedValue( mockBus as DaemonBus );
vi.mocked( daemonClient.sendMessageToProcess ).mockReturnValue( Promise.resolve() );
} );

afterEach( () => {
Expand Down Expand Up @@ -133,14 +134,51 @@ describe( 'WordPress Server Manager', () => {

describe( 'stopWordPressServer', () => {
it( 'should stop WordPress server with correct process name', async () => {
await stopWordPressServer( 'test-site-id' );
vi.mocked( daemonClient.isProcessRunning ).mockResolvedValue( {
name: 'studio-site-test-site-id',
pmId: 1,
status: 'online',
pid: 1234,
} );

expect( vi.mocked( daemonClient.stopProcess ) ).toHaveBeenCalledWith(
'studio-site-test-site-id'
);
vi.mocked( daemonClient.sendMessageToProcess ).mockImplementation( ( processId, message ) => {
setImmediate( () => {
mockBus.emit( 'process-message', {
process: { name: 'studio-site-test-site-id', pm_id: 1 },
raw: {
topic: 'result',
originalMessageId: message.messageId,
},
} );
} );

return Promise.resolve();
} );

const promise = stopWordPressServer( 'test-site-id' );

setTimeout( () => {
mockBus.emit( 'process-event', {
process: { name: 'studio-site-test-site-id', pm_id: 1 },
event: 'exit',
} );
}, 500 );

await promise;

expect( vi.mocked( daemonClient.stopProcess ) ).not.toHaveBeenCalled();
} );

it( 'should propagate stopProcess errors', async () => {
it( 'should propagate errors from fallback `stopProcess` call', async () => {
vi.mocked( daemonClient.isProcessRunning ).mockResolvedValue( {
name: 'studio-site-test-site-id',
pmId: 1,
status: 'online',
pid: 1234,
} );
vi.mocked( daemonClient.sendMessageToProcess ).mockRejectedValue(
new Error( 'Failed to send stop message' )
);
vi.mocked( daemonClient.stopProcess ).mockRejectedValue(
new Error( 'Failed to stop process' )
);
Expand Down
50 changes: 37 additions & 13 deletions apps/cli/lib/wordpress-server-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ export async function sendMessage(
? `Maximum timeout of ${ maxTotalElapsedTime / 1000 }s exceeded`
: `No activity for ${ PLAYGROUND_CLI_INACTIVITY_TIMEOUT / 1000 }s`;
reject(
new Error( `Timeout waiting for response to message ${ messageId }: ${ timeoutReason }` )
new Error(
`Timeout waiting for response to message ${ message.topic }: ${ timeoutReason }`
)
);
}
}, PLAYGROUND_CLI_ACTIVITY_CHECK_INTERVAL );
Expand Down Expand Up @@ -279,20 +281,42 @@ export async function stopWordPressServer( siteId: string ): Promise< void > {
const processName = getProcessName( siteId );
const runningProcess = await isProcessRunning( processName );

if ( runningProcess ) {
try {
await sendMessage(
runningProcess.pmId,
processName,
{ topic: 'stop-server', data: {} },
{ maxTotalElapsedTime: GRACEFUL_STOP_TIMEOUT }
);
} catch {
// Graceful shutdown failed, `stopProcess()` will handle it
}
if ( ! runningProcess ) {
return;
}

return stopProcess( processName );
try {
const bus = await getDaemonBus();
let busExitEventListener: ( event: DaemonBusEventMap[ 'process-event' ] ) => void;

const exitPromise = new Promise< void >( ( resolve ) => {
busExitEventListener = ( event: DaemonBusEventMap[ 'process-event' ] ) => {
if ( event.process.name === processName && event.event === 'exit' ) {
resolve();
}
};

bus.on( 'process-event', busExitEventListener );
} ).finally( () => {
bus.off( 'process-event', busExitEventListener );
} );

await sendMessage(
runningProcess.pmId,
processName,
{ topic: 'stop-server', data: {} },
{ maxTotalElapsedTime: GRACEFUL_STOP_TIMEOUT }
);

// Allow 5 seconds (arbitrary number) of cleanup time for the child process before throwing an
// exception and telling the process manager to send a SIGKILL signal.
await Promise.race( [
exitPromise,
new Promise( ( resolve, reject ) => setTimeout( reject, 5000 ) ),
] );
} catch {
return stopProcess( processName );
}
}

export interface RunBlueprintOptions {
Expand Down
38 changes: 35 additions & 3 deletions apps/cli/process-manager-daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ChildProcess, spawn } from 'child_process';
import fs, { createWriteStream, WriteStream } from 'fs';
import net from 'net';
import path from 'path';
import readline from 'readline';
import semver from 'semver';
import {
PROCESS_MANAGER_LOGS_DIR,
Expand Down Expand Up @@ -51,6 +52,19 @@ function getProcessLogPaths( processName: string ) {
};
}

function timestampLogLine( line: string ): string {
return `${ new Date().toISOString() } ${ line }\n`;
}

function writeTimestampedLines( target: WriteStream, content: string ) {
const normalizedContent = content.split( '\r\n' ).join( '\n' );
const lines = normalizedContent.trimEnd().split( '\n' );

lines.forEach( ( line ) => {
target.write( timestampLogLine( line ) );
} );
}

export class ProcessManagerDaemon {
private readonly controlServer = new SocketServer(
PROCESS_MANAGER_CONTROL_SOCKET_PATH,
Expand Down Expand Up @@ -210,8 +224,8 @@ export class ProcessManagerDaemon {

this.managedProcesses.set( pmId, managedProcess );

child.stdout?.pipe( stdoutStream );
child.stderr?.pipe( stderrStream );
this.pipeOutputWithTimestamp( child.stdout, stdoutStream );
this.pipeOutputWithTimestamp( child.stderr, stderrStream );

child.on( 'message', ( raw ) => {
const event = daemonEventSchema.safeParse( {
Expand All @@ -228,7 +242,7 @@ export class ProcessManagerDaemon {
} );

child.on( 'error', ( error ) => {
void stderrStream.write( `${ error.stack ?? error.message }\n` );
writeTimestampedLines( stderrStream, error.stack ?? error.message );
void this.handleProcessExit( managedProcess );
} );

Expand Down Expand Up @@ -320,6 +334,24 @@ export class ProcessManagerDaemon {
this.eventsServer.broadcast( event );
}

private pipeOutputWithTimestamp(
input: NodeJS.ReadableStream | null,
target: WriteStream
): void {
if ( ! input ) {
return;
}

const lineReader = readline.createInterface( {
input,
crlfDelay: Infinity,
} );

lineReader.on( 'line', ( line ) => {
void target.write( timestampLogLine( line ) );
} );
}

private toProcessDescription( managedProcess: ManagedProcess ): ProcessDescription {
if ( managedProcess.status === 'stopped' ) {
return {
Expand Down
Loading
Loading