diff --git a/Sources/Shellraiser/Infrastructure/Agents/AgentCompletionEventMonitor.swift b/Sources/Shellraiser/Infrastructure/Agents/AgentCompletionEventMonitor.swift index 0ff8040..d9ddcdc 100644 --- a/Sources/Shellraiser/Infrastructure/Agents/AgentCompletionEventMonitor.swift +++ b/Sources/Shellraiser/Infrastructure/Agents/AgentCompletionEventMonitor.swift @@ -5,16 +5,25 @@ final class AgentCompletionEventMonitor: AgentActivityEventMonitoring { var onEvent: ((AgentActivityEvent) -> Void)? private let logURL: URL + private let lockURL: URL private let queue = DispatchQueue(label: "com.shellraiser.completion-event-monitor") - private let compactionThresholdBytes: UInt64 = 64 * 1024 + private let compactionThresholdBytes: UInt64 + private let beforeCompactionAttempt: (() -> Void)? private var source: DispatchSourceFileSystemObject? private var fileDescriptor: CInt = -1 private var readOffset: UInt64 = 0 private var trailingFragment = "" /// Creates a monitor for the provided event log and starts tailing new events. - init(logURL: URL) { + init( + logURL: URL, + compactionThresholdBytes: UInt64 = 64 * 1024, + beforeCompactionAttempt: (() -> Void)? = nil + ) { self.logURL = logURL + self.lockURL = logURL.appendingPathExtension("lock") + self.compactionThresholdBytes = compactionThresholdBytes + self.beforeCompactionAttempt = beforeCompactionAttempt start() } @@ -129,14 +138,47 @@ final class AgentCompletionEventMonitor: AgentActivityEventMonitoring { guard fileSize >= compactionThresholdBytes else { return } do { - try FileHandle(forWritingTo: logURL).truncate(atOffset: 0) - readOffset = 0 - CompletionDebugLogger.log("compacted completion log bytes=\(fileSize)") + beforeCompactionAttempt?() + + try withLogLock { + let currentFileSize = currentFileSize() + guard trailingFragment.isEmpty else { return } + guard readOffset == currentFileSize else { return } + guard currentFileSize >= compactionThresholdBytes else { return } + + let handle = try FileHandle(forWritingTo: logURL) + defer { + try? handle.close() + } + + try handle.truncate(atOffset: 0) + readOffset = 0 + CompletionDebugLogger.log("compacted completion log bytes=\(currentFileSize)") + } } catch { CompletionDebugLogger.log("failed to compact completion log: \(error)") } } + /// Serializes log compaction against helper appends using a sidecar advisory lock. + private func withLogLock(_ body: () throws -> Result) throws -> Result { + let descriptor = open(lockURL.path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR) + guard descriptor >= 0 else { + throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno)) + } + + defer { + _ = Darwin.lockf(descriptor, F_ULOCK, 0) + Darwin.close(descriptor) + } + + guard Darwin.lockf(descriptor, F_LOCK, 0) == 0 else { + throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno)) + } + + return try body() + } + /// Returns the current file size for the watched completion log. private func currentFileSize() -> UInt64 { guard let attributes = try? FileManager.default.attributesOfItem(atPath: logURL.path), diff --git a/Sources/Shellraiser/Infrastructure/Agents/AgentRuntimeBridge.swift b/Sources/Shellraiser/Infrastructure/Agents/AgentRuntimeBridge.swift index 151a2b1..b355842 100644 --- a/Sources/Shellraiser/Infrastructure/Agents/AgentRuntimeBridge.swift +++ b/Sources/Shellraiser/Infrastructure/Agents/AgentRuntimeBridge.swift @@ -43,6 +43,10 @@ final class AgentRuntimeBridge: AgentRuntimeSupporting { fileManager.createFile(atPath: eventLogURL.path, contents: Data()) } + if !fileManager.fileExists(atPath: eventLogLockURL.path) { + fileManager.createFile(atPath: eventLogLockURL.path, contents: Data()) + } + try writeExecutable( named: "shellraiser-agent-complete", contents: helperScriptContents @@ -169,6 +173,11 @@ final class AgentRuntimeBridge: AgentRuntimeSupporting { try data.write(to: fileURL, options: .atomic) } + /// Returns the advisory lock file shared by helper writers and the monitor compactor. + private var eventLogLockURL: URL { + eventLogURL.appendingPathExtension("lock") + } + /// Shell helper that appends normalized activity events to the shared event log. private var helperScriptContents: String { #""" @@ -215,7 +224,8 @@ final class AgentRuntimeBridge: AgentRuntimeSupporting { timestamp="$(date -u +"%Y-%m-%dT%H:%M:%SZ")" encoded="$(printf '%s' "$payload" | /usr/bin/base64 | tr -d '\n')" - printf '%s\t%s\t%s\t%s\t%s\n' "$timestamp" "$runtime" "$surface" "$phase" "$encoded" >> "${SHELLRAISER_EVENT_LOG}" + lock_file="${SHELLRAISER_EVENT_LOG}.lock" + /usr/bin/lockf "$lock_file" /bin/sh -c 'printf "%s\t%s\t%s\t%s\t%s\n" "$2" "$3" "$4" "$5" "$6" >> "$1"' sh "${SHELLRAISER_EVENT_LOG}" "$timestamp" "$runtime" "$surface" "$phase" "$encoded" """# } diff --git a/Tests/ShellraiserTests/AgentCompletionEventMonitorTests.swift b/Tests/ShellraiserTests/AgentCompletionEventMonitorTests.swift index 1ee8aff..14cd6b0 100644 --- a/Tests/ShellraiserTests/AgentCompletionEventMonitorTests.swift +++ b/Tests/ShellraiserTests/AgentCompletionEventMonitorTests.swift @@ -60,6 +60,71 @@ final class AgentCompletionEventMonitorTests: XCTestCase { withExtendedLifetime(monitor) {} } + /// Verifies compaction truncates a fully consumed oversized log and tailing resumes from the reset offset. + func testMonitorCompactsOversizedLogAndContinuesReading() throws { + let logURL = try makeLogFile() + let firstSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001603")! + let secondSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001604")! + let firstExpectation = expectation(description: "First event emitted before compaction") + let secondExpectation = expectation(description: "Second event emitted after compaction") + let monitor = AgentCompletionEventMonitor(logURL: logURL, compactionThresholdBytes: 1) + + monitor.onEvent = { event in + switch event.surfaceId { + case firstSurfaceId: + firstExpectation.fulfill() + case secondSurfaceId: + secondExpectation.fulfill() + default: + XCTFail("Unexpected surface id \(event.surfaceId)") + } + } + + appendLine(eventLine(surfaceId: firstSurfaceId, payload: "first"), to: logURL) + wait(for: [firstExpectation], timeout: 1.0) + XCTAssertTrue(waitForCondition(timeout: 1.0) { + self.currentFileSize(of: logURL) == 0 + }) + + appendLine(eventLine(surfaceId: secondSurfaceId, payload: "second"), to: logURL) + wait(for: [secondExpectation], timeout: 1.0) + withExtendedLifetime(monitor) {} + } + + /// Verifies compaction skips truncation when a new append lands after reading but before the lock is acquired. + func testMonitorSkipsCompactionWhenNewEventArrivesDuringRaceWindow() throws { + let logURL = try makeLogFile() + let firstSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001605")! + let secondSurfaceId = UUID(uuidString: "00000000-0000-0000-0000-000000001606")! + let appendExpectation = expectation(description: "Racing append executed") + let firstExpectation = expectation(description: "First event emitted") + let secondExpectation = expectation(description: "Racing event emitted") + let monitor = AgentCompletionEventMonitor( + logURL: logURL, + compactionThresholdBytes: 1, + beforeCompactionAttempt: { + self.appendLine(self.eventLine(surfaceId: secondSurfaceId, payload: "racing"), to: logURL) + appendExpectation.fulfill() + } + ) + + monitor.onEvent = { event in + switch event.surfaceId { + case firstSurfaceId: + firstExpectation.fulfill() + case secondSurfaceId: + secondExpectation.fulfill() + default: + XCTFail("Unexpected surface id \(event.surfaceId)") + } + } + + appendLine(eventLine(surfaceId: firstSurfaceId, payload: "first"), to: logURL) + + wait(for: [appendExpectation, firstExpectation, secondExpectation], timeout: 1.0) + withExtendedLifetime(monitor) {} + } + /// Creates a disposable event log file for monitor tests. private func makeLogFile(initialContents: String = "") throws -> URL { let url = FileManager.default.temporaryDirectory.appendingPathComponent( @@ -72,6 +137,16 @@ final class AgentCompletionEventMonitorTests: XCTestCase { return url } + /// Returns a valid completion-event log line for the provided surface and payload. + private func eventLine( + surfaceId: UUID, + payload: String, + timestamp: String = "2026-03-08T20:32:00Z" + ) -> String { + let encodedPayload = Data(payload.utf8).base64EncodedString() + return "\(timestamp)\tcodex\t\(surfaceId.uuidString)\tcompleted\t\(encodedPayload)" + } + /// Appends a newline-terminated log line to an existing test file. private func appendLine(_ line: String, to url: URL) { let handle = try! FileHandle(forWritingTo: url) @@ -79,4 +154,29 @@ final class AgentCompletionEventMonitorTests: XCTestCase { handle.write(Data((line + "\n").utf8)) try! handle.close() } + + /// Returns the current byte size for the supplied log file. + private func currentFileSize(of url: URL) -> UInt64 { + let attributes = try! FileManager.default.attributesOfItem(atPath: url.path) + let size = attributes[.size] as! NSNumber + return size.uint64Value + } + + /// Polls until the supplied condition becomes true or the timeout elapses. + private func waitForCondition( + timeout: TimeInterval, + interval: TimeInterval = 0.01, + condition: @escaping () -> Bool + ) -> Bool { + let deadline = Date().addingTimeInterval(timeout) + while Date() < deadline { + if condition() { + return true + } + + RunLoop.current.run(until: Date().addingTimeInterval(interval)) + } + + return condition() + } } diff --git a/Tests/ShellraiserTests/AgentRuntimeBridgeTests.swift b/Tests/ShellraiserTests/AgentRuntimeBridgeTests.swift index 0894c62..ca57b40 100644 --- a/Tests/ShellraiserTests/AgentRuntimeBridgeTests.swift +++ b/Tests/ShellraiserTests/AgentRuntimeBridgeTests.swift @@ -45,6 +45,7 @@ final class AgentRuntimeBridgeTests: XCTestCase { func testPrepareRuntimeSupportWritesHelperWithoutBareCodexCase() throws { let bridge = try makeBridge() let helperURL = bridge.binDirectory.appendingPathComponent("shellraiser-agent-complete") + let lockURL = bridge.runtimeDirectory.appendingPathComponent("agent-completions.log.lock") bridge.prepareRuntimeSupport() @@ -52,7 +53,10 @@ final class AgentRuntimeBridgeTests: XCTestCase { XCTAssertTrue(helperContents.contains("codex:completed)")) XCTAssertTrue(helperContents.contains("codex:session|claudeCode:session)")) + XCTAssertTrue(helperContents.contains("/usr/bin/lockf")) + XCTAssertTrue(helperContents.contains("${SHELLRAISER_EVENT_LOG}.lock")) XCTAssertFalse(helperContents.contains("\n codex)\n")) + XCTAssertTrue(FileManager.default.fileExists(atPath: lockURL.path)) } /// Verifies runtime wrappers emit session identity metadata for later resume.