Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,25 @@ final class AgentCompletionEventMonitor: AgentActivityEventMonitoring {
var onEvent: ((AgentActivityEvent) -> Void)?

private let logURL: URL
private let lockURL: URL
private let queue = DispatchQueue(label: "com.shellraiser.completion-event-monitor")
private let compactionThresholdBytes: UInt64 = 64 * 1024
private let compactionThresholdBytes: UInt64
private let beforeCompactionAttempt: (() -> Void)?
private var source: DispatchSourceFileSystemObject?
private var fileDescriptor: CInt = -1
private var readOffset: UInt64 = 0
private var trailingFragment = ""

/// Creates a monitor for the provided event log and starts tailing new events.
init(logURL: URL) {
init(
logURL: URL,
compactionThresholdBytes: UInt64 = 64 * 1024,
beforeCompactionAttempt: (() -> Void)? = nil
) {
self.logURL = logURL
self.lockURL = logURL.appendingPathExtension("lock")
self.compactionThresholdBytes = compactionThresholdBytes
self.beforeCompactionAttempt = beforeCompactionAttempt
start()
}

Expand Down Expand Up @@ -129,14 +138,47 @@ final class AgentCompletionEventMonitor: AgentActivityEventMonitoring {
guard fileSize >= compactionThresholdBytes else { return }

do {
try FileHandle(forWritingTo: logURL).truncate(atOffset: 0)
readOffset = 0
CompletionDebugLogger.log("compacted completion log bytes=\(fileSize)")
beforeCompactionAttempt?()

try withLogLock {
let currentFileSize = currentFileSize()
guard trailingFragment.isEmpty else { return }
guard readOffset == currentFileSize else { return }
guard currentFileSize >= compactionThresholdBytes else { return }

let handle = try FileHandle(forWritingTo: logURL)
defer {
try? handle.close()
}

try handle.truncate(atOffset: 0)
readOffset = 0
CompletionDebugLogger.log("compacted completion log bytes=\(currentFileSize)")
}
} catch {
CompletionDebugLogger.log("failed to compact completion log: \(error)")
}
}

/// Serializes log compaction against helper appends using a sidecar advisory lock.
private func withLogLock<Result>(_ body: () throws -> Result) throws -> Result {
let descriptor = open(lockURL.path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR)
guard descriptor >= 0 else {
throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno))
}

defer {
_ = Darwin.lockf(descriptor, F_ULOCK, 0)
Darwin.close(descriptor)
}

guard Darwin.lockf(descriptor, F_LOCK, 0) == 0 else {
throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno))
}

return try body()
}

/// Returns the current file size for the watched completion log.
private func currentFileSize() -> UInt64 {
guard let attributes = try? FileManager.default.attributesOfItem(atPath: logURL.path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ final class AgentRuntimeBridge: AgentRuntimeSupporting {
fileManager.createFile(atPath: eventLogURL.path, contents: Data())
}

if !fileManager.fileExists(atPath: eventLogLockURL.path) {
fileManager.createFile(atPath: eventLogLockURL.path, contents: Data())
}

try writeExecutable(
named: "shellraiser-agent-complete",
contents: helperScriptContents
Expand Down Expand Up @@ -169,6 +173,11 @@ final class AgentRuntimeBridge: AgentRuntimeSupporting {
try data.write(to: fileURL, options: .atomic)
}

/// Returns the advisory lock file shared by helper writers and the monitor compactor.
private var eventLogLockURL: URL {
eventLogURL.appendingPathExtension("lock")
}

/// Shell helper that appends normalized activity events to the shared event log.
private var helperScriptContents: String {
#"""
Expand Down Expand Up @@ -215,7 +224,8 @@ final class AgentRuntimeBridge: AgentRuntimeSupporting {

timestamp="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
encoded="$(printf '%s' "$payload" | /usr/bin/base64 | tr -d '\n')"
printf '%s\t%s\t%s\t%s\t%s\n' "$timestamp" "$runtime" "$surface" "$phase" "$encoded" >> "${SHELLRAISER_EVENT_LOG}"
lock_file="${SHELLRAISER_EVENT_LOG}.lock"
/usr/bin/lockf "$lock_file" /bin/sh -c 'printf "%s\t%s\t%s\t%s\t%s\n" "$2" "$3" "$4" "$5" "$6" >> "$1"' sh "${SHELLRAISER_EVENT_LOG}" "$timestamp" "$runtime" "$surface" "$phase" "$encoded"
"""#
}

Expand Down
100 changes: 100 additions & 0 deletions Tests/ShellraiserTests/AgentCompletionEventMonitorTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,71 @@ final class AgentCompletionEventMonitorTests: XCTestCase {
withExtendedLifetime(monitor) {}
}

/// Verifies compaction truncates a fully consumed oversized log and tailing resumes from the reset offset.
func testMonitorCompactsOversizedLogAndContinuesReading() throws {
let logURL = try makeLogFile()
let firstSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001603")!
let secondSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001604")!
let firstExpectation = expectation(description: "First event emitted before compaction")
let secondExpectation = expectation(description: "Second event emitted after compaction")
let monitor = AgentCompletionEventMonitor(logURL: logURL, compactionThresholdBytes: 1)

monitor.onEvent = { event in
switch event.surfaceId {
case firstSurfaceId:
firstExpectation.fulfill()
case secondSurfaceId:
secondExpectation.fulfill()
default:
XCTFail("Unexpected surface id \(event.surfaceId)")
}
}

appendLine(eventLine(surfaceId: firstSurfaceId, payload: "first"), to: logURL)
wait(for: [firstExpectation], timeout: 1.0)
XCTAssertTrue(waitForCondition(timeout: 1.0) {
self.currentFileSize(of: logURL) == 0
})

appendLine(eventLine(surfaceId: secondSurfaceId, payload: "second"), to: logURL)
wait(for: [secondExpectation], timeout: 1.0)
withExtendedLifetime(monitor) {}
}

/// Verifies compaction skips truncation when a new append lands after reading but before the lock is acquired.
func testMonitorSkipsCompactionWhenNewEventArrivesDuringRaceWindow() throws {
let logURL = try makeLogFile()
let firstSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001605")!
let secondSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001606")!
let appendExpectation = expectation(description: "Racing append executed")
let firstExpectation = expectation(description: "First event emitted")
let secondExpectation = expectation(description: "Racing event emitted")
let monitor = AgentCompletionEventMonitor(
logURL: logURL,
compactionThresholdBytes: 1,
beforeCompactionAttempt: {
self.appendLine(self.eventLine(surfaceId: secondSurfaceId, payload: "racing"), to: logURL)
appendExpectation.fulfill()
}
)

monitor.onEvent = { event in
switch event.surfaceId {
case firstSurfaceId:
firstExpectation.fulfill()
case secondSurfaceId:
secondExpectation.fulfill()
default:
XCTFail("Unexpected surface id \(event.surfaceId)")
}
}

appendLine(eventLine(surfaceId: firstSurfaceId, payload: "first"), to: logURL)

wait(for: [appendExpectation, firstExpectation, secondExpectation], timeout: 1.0)
withExtendedLifetime(monitor) {}
}

/// Creates a disposable event log file for monitor tests.
private func makeLogFile(initialContents: String = "") throws -> URL {
let url = FileManager.default.temporaryDirectory.appendingPathComponent(
Expand All @@ -72,11 +137,46 @@ final class AgentCompletionEventMonitorTests: XCTestCase {
return url
}

/// Returns a valid completion-event log line for the provided surface and payload.
private func eventLine(
surfaceId: UUID,
payload: String,
timestamp: String = "2026-03-08T20:32:00Z"
) -> String {
let encodedPayload = Data(payload.utf8).base64EncodedString()
return "\(timestamp)\tcodex\t\(surfaceId.uuidString)\tcompleted\t\(encodedPayload)"
}

/// Appends a newline-terminated log line to an existing test file.
private func appendLine(_ line: String, to url: URL) {
let handle = try! FileHandle(forWritingTo: url)
try! handle.seekToEnd()
handle.write(Data((line + "\n").utf8))
try! handle.close()
}

/// Returns the current byte size for the supplied log file.
private func currentFileSize(of url: URL) -> UInt64 {
let attributes = try! FileManager.default.attributesOfItem(atPath: url.path)
let size = attributes[.size] as! NSNumber
return size.uint64Value
}

/// Polls until the supplied condition becomes true or the timeout elapses.
private func waitForCondition(
timeout: TimeInterval,
interval: TimeInterval = 0.01,
condition: @escaping () -> Bool
) -> Bool {
let deadline = Date().addingTimeInterval(timeout)
while Date() < deadline {
if condition() {
return true
}

RunLoop.current.run(until: Date().addingTimeInterval(interval))
}

return condition()
}
}
4 changes: 4 additions & 0 deletions Tests/ShellraiserTests/AgentRuntimeBridgeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ final class AgentRuntimeBridgeTests: XCTestCase {
func testPrepareRuntimeSupportWritesHelperWithoutBareCodexCase() throws {
let bridge = try makeBridge()
let helperURL = bridge.binDirectory.appendingPathComponent("shellraiser-agent-complete")
let lockURL = bridge.runtimeDirectory.appendingPathComponent("agent-completions.log.lock")

bridge.prepareRuntimeSupport()

let helperContents = try String(contentsOf: helperURL, encoding: .utf8)

XCTAssertTrue(helperContents.contains("codex:completed)"))
XCTAssertTrue(helperContents.contains("codex:session|claudeCode:session)"))
XCTAssertTrue(helperContents.contains("/usr/bin/lockf"))
XCTAssertTrue(helperContents.contains("${SHELLRAISER_EVENT_LOG}.lock"))
XCTAssertFalse(helperContents.contains("\n codex)\n"))
XCTAssertTrue(FileManager.default.fileExists(atPath: lockURL.path))
}

/// Verifies runtime wrappers emit session identity metadata for later resume.
Expand Down
Loading