From a9d8cfce16e7d4bfa03833774f74c582d6140b0d Mon Sep 17 00:00:00 2001 From: lmcmz Date: Fri, 9 May 2025 00:29:36 +1000 Subject: [PATCH] Update websocket --- Sources/Error/FlowError.swift | 1 + Sources/Flow.swift | 3 +- Sources/Models/FlowId.swift | 51 ++++++++------- Sources/Network/Websocket/Websocket.swift | 77 +++++++++++++++++------ Tests/FlowTests/WebSocketTests.swift | 34 +--------- 5 files changed, 88 insertions(+), 78 deletions(-) diff --git a/Sources/Error/FlowError.swift b/Sources/Error/FlowError.swift index b24a87e..7a1340a 100644 --- a/Sources/Error/FlowError.swift +++ b/Sources/Error/FlowError.swift @@ -39,6 +39,7 @@ public extension Flow { case invalidScript case scriptNotFound(name: String, directory: String) case customError(msg: String) + case createWebSocketFailed var rawValue: String { switch self { diff --git a/Sources/Flow.swift b/Sources/Flow.swift index 1f78cb2..f7a69fd 100644 --- a/Sources/Flow.swift +++ b/Sources/Flow.swift @@ -135,10 +135,9 @@ extension Flow { /// - returns: A future that will receive the `Flow.TransactionResult` value. func once(_ transactionId: Flow.ID, status: Flow.Transaction.Status, - delayInNanoSec: UInt64 = 2_000_000_000, timeout: TimeInterval = 60) async throws -> Flow.TransactionResult { - return try await transactionId.once(status: status, delayInNanoSec: delayInNanoSec, timeout: timeout) + return try await transactionId.once(status: status, timeout: timeout) } /// Get notified when transaction's status change to `.finalized`. diff --git a/Sources/Models/FlowId.swift b/Sources/Models/FlowId.swift index 54a0627..95f88a8 100644 --- a/Sources/Models/FlowId.swift +++ b/Sources/Models/FlowId.swift @@ -81,35 +81,34 @@ public extension Flow.ID { /// Get notified when transaction's status changed. /// - parameters: /// - status: The status you want to monitor. - /// - delay: Interval between two queries. Default is 2000 milliseconds. - /// - timeout: Timeout for this request. Default is 60 seconds. + /// - timeout: Timeout for this request. Default is 20 seconds. /// - returns: A future that will receive the `Flow.TransactionResult` value. func once(status: Flow.Transaction.Status, - delayInNanoSec: UInt64 = 2_000_000_000, - timeout: TimeInterval = 60) async throws -> Flow.TransactionResult + timeout: TimeInterval = 20) async throws -> Flow.TransactionResult { - let accessAPI = flow.accessAPI - let timeoutDate = Date(timeIntervalSinceNow: timeout) - - @Sendable - func makeResultCall() async throws -> Flow.TransactionResult { - let now = Date() - if now > timeoutDate { - // timeout - throw Flow.FError.timeout - } - - let result = try await accessAPI.getTransactionResultById(id: self) - if result.status >= status { - // finished - return result - } - - // continue loop - try await _Concurrency.Task.sleep(nanoseconds: delayInNanoSec) - return try await makeResultCall() + guard let ws = Flow.Websocket(chainID: flow.chainID, isDebug: true) else { + throw Flow.FError.createWebSocketFailed } - - return try await makeResultCall() + + ws.connect() + + defer { + ws.disconnect() + } + + let result = try await awaitPublisher( + ws.subscribeToTransactionStatus(txId: self) + .filter{ $0.payload?.transactionResult.status ?? .unknown > status } + .first() + .eraseToAnyPublisher() + , + timeout: timeout + ) + + guard let txResult = result.payload?.transactionResult else { + throw Flow.FError.customError(msg: "Failed to fetch transaction result for - \(self)") + } + + return txResult } } diff --git a/Sources/Network/Websocket/Websocket.swift b/Sources/Network/Websocket/Websocket.swift index bd11152..2266428 100644 --- a/Sources/Network/Websocket/Websocket.swift +++ b/Sources/Network/Websocket/Websocket.swift @@ -15,12 +15,14 @@ public extension Flow { private var isConnected = false private var subscriptions: [String: (subject: PassthroughSubject, type: Any.Type)] = [:] private var cancellables = Set() - + private var isDebug: Bool = false private var timeoutInterval: TimeInterval = 10 + private let connectionSubject = PassthroughSubject() + private var isConnecting: Bool = false private var decoder: JSONDecoder { let dateFormatter = DateFormatter() - // 2022-06-22T15:32:09.08595992Z + // eg. 2022-06-22T15:32:09.08595992Z dateFormatter.dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'" dateFormatter.locale = Locale(identifier: "en_US_POSIX") dateFormatter.timeZone = TimeZone(secondsFromGMT: 0) @@ -39,21 +41,29 @@ public extension Flow { private let url: URL - public init(url: URL, timeoutInterval: TimeInterval = 10) { + public init(url: URL, timeoutInterval: TimeInterval = 30, isDebug: Bool = false) { self.url = url + self.isDebug = isDebug } - convenience init?(chainID: Flow.ChainID, timeoutInterval: TimeInterval = 10) { + convenience init?(chainID: Flow.ChainID, timeoutInterval: TimeInterval = 30, isDebug: Bool = false) { guard let node = chainID.defaultWebSocketNode, let url = node.url else { return nil } - self.init(url: url, timeoutInterval: timeoutInterval) + self.init(url: url, timeoutInterval: timeoutInterval, isDebug: isDebug) } public func connect() { + guard !isConnected && !isConnecting else { return } + isConnecting = true var request = URLRequest(url: url) request.timeoutInterval = timeoutInterval - let pinner = FoundationSecurity(allowSelfSigned: true) // do not validate SSL certificates - socket = WebSocket(request: request, certPinner: pinner) + if isDebug { + let pinner = FoundationSecurity(allowSelfSigned: true) // do not validate SSL certificates + socket = WebSocket(request: request, certPinner: pinner) + } else { + socket = WebSocket(request: request) + } + socket?.delegate = self socket?.connect() } @@ -108,7 +118,6 @@ public extension Flow { let publisher = subscribe(topic: .accountStatuses, arguments: request, type: Flow.Websocket.AccountStatusResponse.self) // Also publish to central publisher for account updates -// Flow.Publisher.shared.publishAccountUpdate(address: Flow.Address(hex: address)) return publisher } @@ -141,19 +150,34 @@ public extension Flow { } } - private func subscribe(topic: Topic, arguments: T, type: U.Type) -> AnyPublisher, Error> { + private func subscribe( + topic: Topic, + arguments: T, + type: U.Type + ) -> AnyPublisher, Error> { let subscriptionId = generateShortUUID() let request = SubscribeRequest(id: subscriptionId, action: .subscribe, topic: topic, arguments: arguments) let subject = PassthroughSubject() subscriptions[subscriptionId] = (subject: subject, type: TopicResponse.self) - do { - let data = try encoder.encode(request) - socket?.write(data: data) - } catch { - subject.send(completion: .failure(error)) - subscriptions.removeValue(forKey: subscriptionId) - Flow.Publisher.shared.publishError(error) + // If not connected or connecting, initiate connection + if !isConnected && !isConnecting { + connect() } + // Wait for connection, then send the request + connectedPublisher + .sink { [weak self] in + guard let self = self else { return } + do { + let data = try self.encoder.encode(request) + self.socket?.write(data: data) + } catch { + subject.send(completion: .failure(error)) + self.subscriptions.removeValue(forKey: subscriptionId) + Flow.Publisher.shared.publishError(error) + } + } + .store(in: &cancellables) + return subject .compactMap { value -> TopicResponse? in return value as? TopicResponse @@ -180,6 +204,16 @@ public extension Flow { let fullUUID = UUID().uuidString return String(fullUUID.prefix(20)) } + + private var connectedPublisher: AnyPublisher { + if isConnected { + // Immediately emit if already connected + return Just(()).eraseToAnyPublisher() + } else { + // Wait for the next connection event + return connectionSubject.prefix(1).eraseToAnyPublisher() + } + } } } @@ -190,10 +224,13 @@ extension Flow.Websocket: WebSocketDelegate { switch event { case .connected: isConnected = true + isConnecting = false + connectionSubject.send(()) Flow.Publisher.shared.publishConnectionStatus(isConnected: true) case .disconnected(_, _): isConnected = false + isConnecting = false Flow.Publisher.shared.publishConnectionStatus(isConnected: false) case .text(let string): @@ -234,8 +271,12 @@ extension Flow.Websocket: WebSocketDelegate { print("Active subscriptions: \(response.subscriptions)") return } - let object = try JSONSerialization.jsonObject(with: data) - print(object) + + if isDebug { + let object = try JSONSerialization.jsonObject(with: data) + print(object) + } + if let _ = try? decoder.decode(SubscribeResponse.self, from: data) { return } diff --git a/Tests/FlowTests/WebSocketTests.swift b/Tests/FlowTests/WebSocketTests.swift index 48535a9..7648534 100644 --- a/Tests/FlowTests/WebSocketTests.swift +++ b/Tests/FlowTests/WebSocketTests.swift @@ -18,33 +18,7 @@ final class WebSocketTests: XCTestCase { super.tearDown() } - func awaitConnection() async throws { - let result = try await awaitPublisher( - Flow.Publisher.shared.connectionPublisher - .filter { $0 == true } - .first() - ) - - print(result) - } - - func testWebSocketConnection() async throws { - try await awaitConnection() - } - - func testWebSocketDisconnection() async throws { - try await awaitConnection() - Flow.Publisher.shared.connectionPublisher - .sink(receiveValue: { connect in - print(connect) - }) - websocket.disconnect() - -// XCTAssertEqual(connect, false) - } - func testBlockDigestSubscription() async throws { - try await awaitConnection() let blockHeader = try await awaitPublisher( websocket.subscribeToBlockDigests() ) @@ -52,7 +26,6 @@ final class WebSocketTests: XCTestCase { } func testTransactionStatusSubscription() async throws { - try await awaitConnection() let testTxId = "5ab8b0bec5ee89c63c5c33ddc4144f3772d0eeda0e85e905fc7e41c2d449269f" websocket.subscribeToTransactionStatus(txId: .init(hex: testTxId)) let status = try await awaitPublisher( @@ -63,21 +36,18 @@ final class WebSocketTests: XCTestCase { print(status) XCTAssertNotNil(status) - websocket.disconnect() } func testAccountStatusSubscription() async throws { - try await awaitConnection() - let testAddress = "0x418c09f201f67f89" let account = try await awaitPublisher( websocket.subscribeToAccountStatuses(request: .init(heartbeatInterval: "10", accountAddresses: [testAddress])) ) XCTAssertNotNil(account) - websocket.disconnect() } func testListSubscriptions() async throws { - try await awaitConnection() + // TODO + XCTAssertTrue(true) } }