Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/Error/FlowError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public extension Flow {
case invalidScript
case scriptNotFound(name: String, directory: String)
case customError(msg: String)
case createWebSocketFailed

var rawValue: String {
switch self {
Expand Down
3 changes: 1 addition & 2 deletions Sources/Flow.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ extension Flow {
/// - returns: A future that will receive the `Flow.TransactionResult` value.
func once(_ transactionId: Flow.ID,
status: Flow.Transaction.Status,
delayInNanoSec: UInt64 = 2_000_000_000,
timeout: TimeInterval = 60) async throws -> Flow.TransactionResult
{
return try await transactionId.once(status: status, delayInNanoSec: delayInNanoSec, timeout: timeout)
return try await transactionId.once(status: status, timeout: timeout)
}

/// Get notified when transaction's status change to `.finalized`.
Expand Down
51 changes: 25 additions & 26 deletions Sources/Models/FlowId.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,35 +81,34 @@ public extension Flow.ID {
/// Get notified when transaction's status changed.
/// - parameters:
/// - status: The status you want to monitor.
/// - delay: Interval between two queries. Default is 2000 milliseconds.
/// - timeout: Timeout for this request. Default is 60 seconds.
/// - timeout: Timeout for this request. Default is 20 seconds.
/// - returns: A future that will receive the `Flow.TransactionResult` value.
func once(status: Flow.Transaction.Status,
delayInNanoSec: UInt64 = 2_000_000_000,
timeout: TimeInterval = 60) async throws -> Flow.TransactionResult
timeout: TimeInterval = 20) async throws -> Flow.TransactionResult
{
let accessAPI = flow.accessAPI
let timeoutDate = Date(timeIntervalSinceNow: timeout)

@Sendable
func makeResultCall() async throws -> Flow.TransactionResult {
let now = Date()
if now > timeoutDate {
// timeout
throw Flow.FError.timeout
}

let result = try await accessAPI.getTransactionResultById(id: self)
if result.status >= status {
// finished
return result
}

// continue loop
try await _Concurrency.Task.sleep(nanoseconds: delayInNanoSec)
return try await makeResultCall()
guard let ws = Flow.Websocket(chainID: flow.chainID, isDebug: true) else {
throw Flow.FError.createWebSocketFailed
}

return try await makeResultCall()

ws.connect()

defer {
ws.disconnect()
}

let result = try await awaitPublisher(
ws.subscribeToTransactionStatus(txId: self)
.filter{ $0.payload?.transactionResult.status ?? .unknown > status }
.first()
.eraseToAnyPublisher()
,
timeout: timeout
)

guard let txResult = result.payload?.transactionResult else {
throw Flow.FError.customError(msg: "Failed to fetch transaction result for - \(self)")
}

return txResult
}
}
77 changes: 59 additions & 18 deletions Sources/Network/Websocket/Websocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ public extension Flow {
private var isConnected = false
private var subscriptions: [String: (subject: PassthroughSubject<Any, Error>, type: Any.Type)] = [:]
private var cancellables = Set<AnyCancellable>()

private var isDebug: Bool = false
private var timeoutInterval: TimeInterval = 10
private let connectionSubject = PassthroughSubject<Void, Never>()
private var isConnecting: Bool = false

private var decoder: JSONDecoder {
let dateFormatter = DateFormatter()
// 2022-06-22T15:32:09.08595992Z
// eg. 2022-06-22T15:32:09.08595992Z
dateFormatter.dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'"
dateFormatter.locale = Locale(identifier: "en_US_POSIX")
dateFormatter.timeZone = TimeZone(secondsFromGMT: 0)
Expand All @@ -39,21 +41,29 @@ public extension Flow {

private let url: URL

public init(url: URL, timeoutInterval: TimeInterval = 10) {
public init(url: URL, timeoutInterval: TimeInterval = 30, isDebug: Bool = false) {
self.url = url
self.isDebug = isDebug
}

convenience init?(chainID: Flow.ChainID, timeoutInterval: TimeInterval = 10) {
convenience init?(chainID: Flow.ChainID, timeoutInterval: TimeInterval = 30, isDebug: Bool = false) {
guard let node = chainID.defaultWebSocketNode, let url = node.url else { return nil }
self.init(url: url, timeoutInterval: timeoutInterval)
self.init(url: url, timeoutInterval: timeoutInterval, isDebug: isDebug)
}

public func connect() {
guard !isConnected && !isConnecting else { return }
isConnecting = true
var request = URLRequest(url: url)
request.timeoutInterval = timeoutInterval

let pinner = FoundationSecurity(allowSelfSigned: true) // do not validate SSL certificates
socket = WebSocket(request: request, certPinner: pinner)
if isDebug {
let pinner = FoundationSecurity(allowSelfSigned: true) // do not validate SSL certificates
socket = WebSocket(request: request, certPinner: pinner)
} else {
socket = WebSocket(request: request)
}

socket?.delegate = self
socket?.connect()
}
Expand Down Expand Up @@ -108,7 +118,6 @@ public extension Flow {
let publisher = subscribe(topic: .accountStatuses, arguments: request, type: Flow.Websocket.AccountStatusResponse.self)

// Also publish to central publisher for account updates
// Flow.Publisher.shared.publishAccountUpdate(address: Flow.Address(hex: address))

return publisher
}
Expand Down Expand Up @@ -141,19 +150,34 @@ public extension Flow {
}
}

private func subscribe<T: Encodable, U: Decodable>(topic: Topic, arguments: T, type: U.Type) -> AnyPublisher<TopicResponse<U>, Error> {
private func subscribe<T: Encodable, U: Decodable>(
topic: Topic,
arguments: T,
type: U.Type
) -> AnyPublisher<TopicResponse<U>, Error> {
let subscriptionId = generateShortUUID()
let request = SubscribeRequest(id: subscriptionId, action: .subscribe, topic: topic, arguments: arguments)
let subject = PassthroughSubject<Any, Error>()
subscriptions[subscriptionId] = (subject: subject, type: TopicResponse<U>.self)
do {
let data = try encoder.encode(request)
socket?.write(data: data)
} catch {
subject.send(completion: .failure(error))
subscriptions.removeValue(forKey: subscriptionId)
Flow.Publisher.shared.publishError(error)
// If not connected or connecting, initiate connection
if !isConnected && !isConnecting {
connect()
}
// Wait for connection, then send the request
connectedPublisher
.sink { [weak self] in
guard let self = self else { return }
do {
let data = try self.encoder.encode(request)
self.socket?.write(data: data)
} catch {
subject.send(completion: .failure(error))
self.subscriptions.removeValue(forKey: subscriptionId)
Flow.Publisher.shared.publishError(error)
}
}
.store(in: &cancellables)

return subject
.compactMap { value -> TopicResponse<U>? in
return value as? TopicResponse<U>
Expand All @@ -180,6 +204,16 @@ public extension Flow {
let fullUUID = UUID().uuidString
return String(fullUUID.prefix(20))
}

private var connectedPublisher: AnyPublisher<Void, Never> {
if isConnected {
// Immediately emit if already connected
return Just(()).eraseToAnyPublisher()
} else {
// Wait for the next connection event
return connectionSubject.prefix(1).eraseToAnyPublisher()
}
}
}
}

Expand All @@ -190,10 +224,13 @@ extension Flow.Websocket: WebSocketDelegate {
switch event {
case .connected:
isConnected = true
isConnecting = false
connectionSubject.send(())
Flow.Publisher.shared.publishConnectionStatus(isConnected: true)

case .disconnected(_, _):
isConnected = false
isConnecting = false
Flow.Publisher.shared.publishConnectionStatus(isConnected: false)

case .text(let string):
Expand Down Expand Up @@ -234,8 +271,12 @@ extension Flow.Websocket: WebSocketDelegate {
print("Active subscriptions: \(response.subscriptions)")
return
}
let object = try JSONSerialization.jsonObject(with: data)
print(object)

if isDebug {
let object = try JSONSerialization.jsonObject(with: data)
print(object)
}

if let _ = try? decoder.decode(SubscribeResponse.self, from: data) {
return
}
Expand Down
34 changes: 2 additions & 32 deletions Tests/FlowTests/WebSocketTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,14 @@ final class WebSocketTests: XCTestCase {
super.tearDown()
}

func awaitConnection() async throws {
let result = try await awaitPublisher(
Flow.Publisher.shared.connectionPublisher
.filter { $0 == true }
.first()
)

print(result)
}

func testWebSocketConnection() async throws {
try await awaitConnection()
}

func testWebSocketDisconnection() async throws {
try await awaitConnection()
Flow.Publisher.shared.connectionPublisher
.sink(receiveValue: { connect in
print(connect)
})
websocket.disconnect()

// XCTAssertEqual(connect, false)
}

func testBlockDigestSubscription() async throws {
try await awaitConnection()
let blockHeader = try await awaitPublisher(
websocket.subscribeToBlockDigests()
)
XCTAssertNotNil(blockHeader)
}

func testTransactionStatusSubscription() async throws {
try await awaitConnection()
let testTxId = "5ab8b0bec5ee89c63c5c33ddc4144f3772d0eeda0e85e905fc7e41c2d449269f"
websocket.subscribeToTransactionStatus(txId: .init(hex: testTxId))
let status = try await awaitPublisher(
Expand All @@ -63,21 +36,18 @@ final class WebSocketTests: XCTestCase {

print(status)
XCTAssertNotNil(status)
websocket.disconnect()
}

func testAccountStatusSubscription() async throws {
try await awaitConnection()

let testAddress = "0x418c09f201f67f89"
let account = try await awaitPublisher(
websocket.subscribeToAccountStatuses(request: .init(heartbeatInterval: "10", accountAddresses: [testAddress]))
)
XCTAssertNotNil(account)
websocket.disconnect()
}

func testListSubscriptions() async throws {
try await awaitConnection()
// TODO
XCTAssertTrue(true)
}
}
Loading