From 020ba1f5e99d5c4c2768c8d6eaa11c76c5cc6991 Mon Sep 17 00:00:00 2001 From: mertushka Date: Tue, 16 Jun 2026 05:19:37 +0300 Subject: [PATCH] deps(transport-webrtc): replace node-datachannel --- packages/transport-webrtc/package.json | 2 +- .../private-to-private/initiate-connection.ts | 19 +-- .../src/private-to-private/transport.ts | 14 -- .../src/private-to-public/utils/connect.ts | 1 - .../utils/get-rtcpeerconnection.ts | 124 +++++------------- .../private-to-public/utils/stun-listener.ts | 7 +- .../src/rtcpeerconnection-to-conn.ts | 6 +- packages/transport-webrtc/src/stream.ts | 12 +- packages/transport-webrtc/src/util.ts | 3 +- packages/transport-webrtc/src/webrtc/index.ts | 13 +- packages/transport-webrtc/test/maconn.spec.ts | 1 - packages/transport-webrtc/test/peer.spec.ts | 18 ++- packages/transport-webrtc/test/stream.spec.ts | 62 ++------- 13 files changed, 88 insertions(+), 194 deletions(-) diff --git a/packages/transport-webrtc/package.json b/packages/transport-webrtc/package.json index c35f99dbed..b2ae140335 100644 --- a/packages/transport-webrtc/package.json +++ b/packages/transport-webrtc/package.json @@ -53,6 +53,7 @@ "@libp2p/keychain": "^6.1.3", "@libp2p/peer-id": "^6.0.11", "@libp2p/utils": "^7.2.3", + "@mertushka/webrtc-node": "^0.2.0", "@multiformats/multiaddr": "^13.0.3", "@multiformats/multiaddr-matcher": "^3.0.2", "@peculiar/webcrypto": "^1.5.0", @@ -65,7 +66,6 @@ "it-stream-types": "^2.0.2", "main-event": "^1.0.1", "multiformats": "^14.0.0", - "node-datachannel": "^0.32.3", "p-defer": "^4.0.1", "p-event": "^7.0.0", "p-timeout": "^7.0.0", diff --git a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts index 062e583ce6..35958c0238 100644 --- a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts +++ b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts @@ -65,23 +65,15 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa runOnLimitedConnection: true }) + if (signal?.aborted === true) { + stream.abort(signal.reason) + signal.throwIfAborted() + } + const messageStream = pbStream(stream).pb(Message) const peerConnection = new RTCPeerConnection(rtcConfiguration) - // make sure C++ peer connection is garbage collected - // https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155 - peerConnection.addEventListener('connectionstatechange', () => { - switch (peerConnection.connectionState) { - case 'closed': - peerConnection.close() - break - default: - break - } - }) - const muxerFactory = new DataChannelMuxerFactory({ - // @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370 peerConnection, dataChannelOptions: dataChannel }) @@ -209,7 +201,6 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa return { remoteAddress: ma, - // @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370 peerConnection, muxerFactory } diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index 6b4fcc6a0d..4124d6499e 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -199,19 +199,7 @@ export class WebRTCTransport implements Transport, Startable { async _onProtocol (stream: Stream, connection: Connection, signal: AbortSignal): Promise { const peerConnection = new RTCPeerConnection(await getRtcConfiguration(this.init.rtcConfiguration)) - // make sure C++ peer connection is garbage collected - // https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155 - peerConnection.addEventListener('connectionstatechange', () => { - switch (peerConnection.connectionState) { - case 'closed': - peerConnection.close() - break - default: - break - } - }) const muxerFactory = new DataChannelMuxerFactory({ - // @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370 peerConnection, dataChannelOptions: this.init.dataChannel }) @@ -229,7 +217,6 @@ export class WebRTCTransport implements Transport, Startable { }) const webRTCConn = toMultiaddrConnection({ - // @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370 peerConnection, remoteAddr: remoteAddress, metrics: this.metrics?.listenerEvents, @@ -246,7 +233,6 @@ export class WebRTCTransport implements Transport, Startable { }) // close the connection on shut down - // @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370 this._closeOnShutdown(peerConnection, webRTCConn) } catch (err: any) { this.log.error('incoming signaling error - %e', err) diff --git a/packages/transport-webrtc/src/private-to-public/utils/connect.ts b/packages/transport-webrtc/src/private-to-public/utils/connect.ts index 4aae1e5c49..c5737d664e 100644 --- a/packages/transport-webrtc/src/private-to-public/utils/connect.ts +++ b/packages/transport-webrtc/src/private-to-public/utils/connect.ts @@ -124,7 +124,6 @@ export async function connect (peerConnection: RTCPeerConnection | DirectRTCPeer // Creating the connection before completion of the noise // handshake ensures that the stream opening callback is set up const maConn = toMultiaddrConnection({ - // @ts-expect-error types are broken peerConnection, remoteAddr: options.remoteAddr, metrics: options.events, diff --git a/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.ts b/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.ts index dcd2c37770..5de7a4c1f5 100644 --- a/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.ts +++ b/packages/transport-webrtc/src/private-to-public/utils/get-rtcpeerconnection.ts @@ -1,90 +1,21 @@ import { Crypto } from '@peculiar/webcrypto' -import { PeerConnection } from 'node-datachannel' -import { RTCPeerConnection } from 'node-datachannel/polyfill' +import * as WebRTCNode from '@mertushka/webrtc-node' import { DEFAULT_ICE_SERVERS, MAX_MESSAGE_SIZE } from '../../constants.ts' import { DataChannelMuxerFactory } from '../../muxer.ts' import { generateTransportCertificate } from './generate-certificates.ts' import type { DataChannelOptions, TransportCertificate } from '../../index.ts' import type { CounterGroup } from '@libp2p/interface' -import type { CertificateFingerprint } from 'node-datachannel' const crypto = new Crypto() - -interface DirectRTCPeerConnectionInit extends RTCConfiguration { - ufrag: string - peerConnection: PeerConnection -} - -export class DirectRTCPeerConnection extends RTCPeerConnection { - private peerConnection: PeerConnection - private readonly ufrag: string - - constructor (init: DirectRTCPeerConnectionInit) { - super(init) - - this.peerConnection = init.peerConnection - this.ufrag = init.ufrag - - // make sure C++ peer connection is garbage collected - // https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155 - this.addEventListener('connectionstatechange', () => { - switch (this.connectionState) { - case 'closed': - this.peerConnection.close() - break - default: - break - } - }) - } - - async createOffer (): Promise { - // have to set ufrag before creating offer - if (this.connectionState === 'new') { - this.peerConnection?.setLocalDescription('offer', { - iceUfrag: this.ufrag, - icePwd: this.ufrag - }) - } - - return super.createOffer() - } - - async createAnswer (): Promise { - // have to set ufrag before creating answer - if (this.connectionState === 'new') { - this.peerConnection?.setLocalDescription('answer', { - iceUfrag: this.ufrag, - icePwd: this.ufrag - }) - } - - return super.createAnswer() - } - - remoteFingerprint (): CertificateFingerprint { - if (this.peerConnection == null) { - throw new Error('Invalid state: peer connection not set') - } - - return this.peerConnection.remoteFingerprint() - } -} - -function mapIceServers (iceServers?: RTCIceServer[]): string[] { - return iceServers - ?.map((server) => { - const urls = Array.isArray(server.urls) ? server.urls : [server.urls] - - return urls.map((url) => { - if (server.username != null && server.credential != null) { - const [protocol, rest] = url.split(/:(.*)/) - return `${protocol}:${server.username}:${server.credential}@${rest}` - } - return url - }) - }) - .flat() ?? [] +const webRTCNode = (WebRTCNode as unknown as { default?: typeof WebRTCNode }).default ?? WebRTCNode +const { + RTCPeerConnection: NodeRTCPeerConnection, + nonstandard +} = webRTCNode +const RTCPeerConnection = NodeRTCPeerConnection as unknown as typeof globalThis.RTCPeerConnection + +export interface DirectRTCPeerConnection extends globalThis.RTCPeerConnection { + remoteFingerprint(): ReturnType } export interface CreateDialerRTCPeerConnectionOptions { @@ -113,23 +44,32 @@ export async function createDialerRTCPeerConnection (role: 'client' | 'server', } const rtcConfig = typeof options.rtcConfiguration === 'function' ? await options.rtcConfiguration() : options.rtcConfiguration + const certificate = nonstandard.importCertificate({ + certificatePem: options.certificate.pem, + privateKeyPem: options.certificate.privateKey + }) - const peerConnection = new DirectRTCPeerConnection({ - ...rtcConfig, - ufrag, - peerConnection: new PeerConnection(`${role}-${Date.now()}`, { - disableFingerprintVerification: true, - disableAutoNegotiation: true, - certificatePemFile: options.certificate.pem, - keyPemFile: options.certificate.privateKey, - enableIceUdpMux: role === 'server', - maxMessageSize: MAX_MESSAGE_SIZE, - iceServers: mapIceServers(rtcConfig?.iceServers ?? DEFAULT_ICE_SERVERS.map(urls => ({ urls }))) - }) + const peerConnection = new RTCPeerConnection({ + ...(rtcConfig ?? {}), + certificates: [certificate as unknown as globalThis.RTCCertificate], + iceServers: rtcConfig?.iceServers ?? DEFAULT_ICE_SERVERS.map(urls => ({ urls })) + }) as DirectRTCPeerConnection + const nonstandardPeerConnection = peerConnection as unknown as InstanceType + + nonstandard.configurePeerConnection(nonstandardPeerConnection, { + disableFingerprintVerification: true, + enableIceUdpMux: role === 'server', + maxMessageSize: MAX_MESSAGE_SIZE }) + nonstandard.setLocalIceCredentials(nonstandardPeerConnection, { + iceUfrag: ufrag, + icePwd: ufrag + }) + + peerConnection.remoteFingerprint = () => nonstandard.getRemoteFingerprint(nonstandardPeerConnection) + const muxerFactory = new DataChannelMuxerFactory({ - // @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370 peerConnection, metrics: options.events, dataChannelOptions: options.dataChannel diff --git a/packages/transport-webrtc/src/private-to-public/utils/stun-listener.ts b/packages/transport-webrtc/src/private-to-public/utils/stun-listener.ts index dfbf65c195..6f9f13bb0e 100644 --- a/packages/transport-webrtc/src/private-to-public/utils/stun-listener.ts +++ b/packages/transport-webrtc/src/private-to-public/utils/stun-listener.ts @@ -1,8 +1,11 @@ import { isIPv4 } from '@chainsafe/is-ip' -import { IceUdpMuxListener } from 'node-datachannel' +import * as WebRTCNode from '@mertushka/webrtc-node' import type { Logger } from '@libp2p/interface' import type { AddressInfo } from 'node:net' +const webRTCNode = (WebRTCNode as unknown as { default?: typeof WebRTCNode }).default ?? WebRTCNode +const { nonstandard } = webRTCNode + export interface StunServer { close(): Promise address(): AddressInfo @@ -13,7 +16,7 @@ export interface Callback { } export async function stunListener (host: string, port: number, log: Logger, cb: Callback): Promise { - const listener = new IceUdpMuxListener(port, host) + const listener = new nonstandard.IceUdpMuxListener(port, host) listener.onUnhandledStunRequest(request => { if (request.ufrag == null) { return diff --git a/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts b/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts index e19fb271ef..6eaeedad7b 100644 --- a/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts +++ b/packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts @@ -24,9 +24,9 @@ class RTCPeerConnectionMultiaddrConnection extends AbstractMultiaddrConnection { // nothing else to do but close the connection this.onTransportClosed() - // only necessary with node-datachannel - // https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155 - this.peerConnection.close() + if (this.peerConnection.connectionState === 'failed') { + this.peerConnection.close() + } } } } diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index a75229c7ff..aeb5d11816 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -140,15 +140,9 @@ export class WebRTCStream extends AbstractStream { this.log.trace('sending message, channel state "%s"', this.channel.readyState) - try { - // send message without copying data - for (const buf of data) { - this.channel.send(withArrayBuffer(buf)) - } - } catch (err: any) { - // channel.send can throw synchronously if the polyfill's cached readyState is stale - this.log.error('error sending datachannel message - %e', err) - this.abort(err) + // send message without copying data + for (const buf of data) { + this.channel.send(withArrayBuffer(buf)) } } diff --git a/packages/transport-webrtc/src/util.ts b/packages/transport-webrtc/src/util.ts index 80225e5d02..b5cfa7b126 100644 --- a/packages/transport-webrtc/src/util.ts +++ b/packages/transport-webrtc/src/util.ts @@ -3,7 +3,6 @@ import pTimeout from 'p-timeout' import { DATA_CHANNEL_DRAIN_TIMEOUT, DEFAULT_ICE_SERVERS, UFRAG_ALPHABET, UFRAG_PREFIX } from './constants.ts' import type { LoggerOptions } from '@libp2p/interface' import type { Duplex, Source } from 'it-stream-types' -import type { PeerConnection } from 'node-datachannel' export const nopSource = async function * nop (): AsyncGenerator {} @@ -82,7 +81,7 @@ export interface AbortPromiseOptions { message?: string } -export function isPeerConnection (obj: any): obj is PeerConnection { +export function isPeerConnection (obj: any): obj is { state(): RTCPeerConnectionState } { return typeof obj.state === 'function' } diff --git a/packages/transport-webrtc/src/webrtc/index.ts b/packages/transport-webrtc/src/webrtc/index.ts index 3f1bd1580e..23f32685df 100644 --- a/packages/transport-webrtc/src/webrtc/index.ts +++ b/packages/transport-webrtc/src/webrtc/index.ts @@ -1 +1,12 @@ -export { RTCSessionDescription, RTCIceCandidate, RTCPeerConnection } from 'node-datachannel/polyfill' +import * as WebRTCNode from '@mertushka/webrtc-node' + +const webRTCNode = (WebRTCNode as unknown as { default?: typeof WebRTCNode }).default ?? WebRTCNode +const { + RTCIceCandidate: NodeRTCIceCandidate, + RTCPeerConnection: NodeRTCPeerConnection, + RTCSessionDescription: NodeRTCSessionDescription +} = webRTCNode + +export const RTCSessionDescription = NodeRTCSessionDescription as unknown as typeof globalThis.RTCSessionDescription +export const RTCIceCandidate = NodeRTCIceCandidate as unknown as typeof globalThis.RTCIceCandidate +export const RTCPeerConnection = NodeRTCPeerConnection as unknown as typeof globalThis.RTCPeerConnection diff --git a/packages/transport-webrtc/test/maconn.spec.ts b/packages/transport-webrtc/test/maconn.spec.ts index 3731eccde9..998bd1e2e4 100644 --- a/packages/transport-webrtc/test/maconn.spec.ts +++ b/packages/transport-webrtc/test/maconn.spec.ts @@ -18,7 +18,6 @@ describe('Multiaddr Connection', () => { reset: () => {} }) const maConn = toMultiaddrConnection({ - // @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370 peerConnection, remoteAddr, metrics, diff --git a/packages/transport-webrtc/test/peer.spec.ts b/packages/transport-webrtc/test/peer.spec.ts index 2ad6b8d477..709911bb56 100644 --- a/packages/transport-webrtc/test/peer.spec.ts +++ b/packages/transport-webrtc/test/peer.spec.ts @@ -44,6 +44,14 @@ interface PrivateToPrivateComponents { recipient: Recipient } +function closePeerConnection (peerConnection?: RTCPeerConnection): void { + if (peerConnection == null || peerConnection.connectionState === 'closed') { + return + } + + peerConnection.close() +} + async function getComponents (): Promise { const relayPeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) const initiatorPeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) @@ -82,8 +90,8 @@ describe('webrtc basic', () => { let initiatorPeerConnection: RTCPeerConnection afterEach(() => { - initiatorPeerConnection?.close() - recipient?.peerConnection?.close() + closePeerConnection(initiatorPeerConnection) + closePeerConnection(recipient?.peerConnection) }) it('should connect', async () => { @@ -146,7 +154,7 @@ describe('webrtc receiver', () => { let recipient: Recipient afterEach(() => { - recipient?.peerConnection?.close() + closePeerConnection(recipient?.peerConnection) }) it('should fail receiving on invalid sdp offer', async () => { @@ -164,7 +172,7 @@ describe('webrtc dialer', () => { let recipient: Recipient afterEach(() => { - recipient?.peerConnection?.close() + closePeerConnection(recipient?.peerConnection) }) it('should fail receiving on invalid sdp answer', async () => { @@ -218,7 +226,7 @@ describe('webrtc dialer', () => { await expect(initiatorPeerConnectionPromise).to.be.rejectedWith(/Remote should send an SDP answer/) - pc.close() + closePeerConnection(pc) }) }) diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index 03e0354118..4ebc752391 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -5,7 +5,6 @@ import * as lengthPrefixed from 'it-length-prefixed' import { bytes } from 'multiformats' import { pEvent } from 'p-event' import { stubInterface } from 'sinon-ts' -import { isNode, isElectronMain } from 'wherearewe' import { MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from '../src/constants.ts' import { Message } from '../src/private-to-public/pb/message.ts' import { createStream } from '../src/stream.ts' @@ -74,64 +73,29 @@ describe('Max message size', () => { }) describe('Datachannel send errors', () => { - let pcA: RTCPeerConnection - let pcB: RTCPeerConnection - - afterEach(() => { - pcA?.close() - pcB?.close() - }) - - it('aborts the stream when underlying datachannel is closed mid-send', async function () { - // polyfill-specific race; native browser WebRTC doesn't exhibit it - if (!isNode && !isElectronMain) { - return this.skip() - } - - // the node-datachannel polyfill's cached readyState updates on the next - // tick after onClosed fires, so closing the peer leaves a window where - // send() passes the guard and hits an already-closed native channel - pcA = new RTCPeerConnection() - pcB = new RTCPeerConnection() - const channelA = pcA.createDataChannel('test', { negotiated: true, id: 91 }) - const channelB = pcB.createDataChannel('test', { negotiated: true, id: 91 }) - - pcA.onicecandidate = ({ candidate }) => { - if (candidate != null) { - pcB.addIceCandidate(candidate).catch(() => {}) - } - } - pcB.onicecandidate = ({ candidate }) => { - if (candidate != null) { - pcA.addIceCandidate(candidate).catch(() => {}) + it('propagates channel.send errors', async () => { + const error = new Error('send failed') + const channel = stubInterface({ + readyState: 'open', + bufferedAmount: 0, + send: () => { + throw error } - } - - await pcA.setLocalDescription(await pcA.createOffer()) - await pcB.setRemoteDescription(pcA.localDescription as RTCSessionDescriptionInit) - await pcB.setLocalDescription(await pcB.createAnswer()) - await pcA.setRemoteDescription(pcB.localDescription as RTCSessionDescriptionInit) - - await Promise.all([ - pEvent(channelA, 'open', { rejectionEvents: ['close', 'error'] }), - pEvent(channelB, 'open', { rejectionEvents: ['close', 'error'] }) - ]) - + }) const webrtcStream = createStream({ - channel: channelA, + channel, direction: 'outbound', closeTimeout: 1, log: defaultLogger().forComponent('test') }) - pcA.close() - expect(channelA.readyState).to.equal('open') - const closeEventPromise = pEvent<'close', StreamCloseEvent>(webrtcStream, 'close') - webrtcStream.send(new Uint8Array([1, 2, 3, 4])) + expect(() => { + webrtcStream.send(new Uint8Array([1, 2, 3, 4])) + }).to.throw('send failed') const closeEvent = await closeEventPromise - expect(closeEvent.error).to.exist() + expect(closeEvent.error).to.equal(error) expect(webrtcStream.status).to.equal('aborted') }) })