Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"@libp2p/keychain": "^6.1.3",
"@libp2p/peer-id": "^6.0.11",
"@libp2p/utils": "^7.2.3",
"@mertushka/webrtc-node": "^0.2.0",
"@multiformats/multiaddr": "^13.0.3",
"@multiformats/multiaddr-matcher": "^3.0.2",
"@peculiar/webcrypto": "^1.5.0",
Expand All @@ -65,7 +66,6 @@
"it-stream-types": "^2.0.2",
"main-event": "^1.0.1",
"multiformats": "^14.0.0",
"node-datachannel": "^0.32.3",
"p-defer": "^4.0.1",
"p-event": "^7.0.0",
"p-timeout": "^7.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,15 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa
runOnLimitedConnection: true
})

if (signal?.aborted === true) {
stream.abort(signal.reason)
signal.throwIfAborted()
}

const messageStream = pbStream(stream).pb(Message)
const peerConnection = new RTCPeerConnection(rtcConfiguration)

// make sure C++ peer connection is garbage collected
// https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155
peerConnection.addEventListener('connectionstatechange', () => {
switch (peerConnection.connectionState) {
case 'closed':
peerConnection.close()
break
default:
break
}
})

const muxerFactory = new DataChannelMuxerFactory({
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
dataChannelOptions: dataChannel
})
Expand Down Expand Up @@ -209,7 +201,6 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa

return {
remoteAddress: ma,
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
muxerFactory
}
Expand Down
14 changes: 0 additions & 14 deletions packages/transport-webrtc/src/private-to-private/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,7 @@ export class WebRTCTransport implements Transport<WebRTCDialEvents>, Startable {
async _onProtocol (stream: Stream, connection: Connection, signal: AbortSignal): Promise<void> {
const peerConnection = new RTCPeerConnection(await getRtcConfiguration(this.init.rtcConfiguration))

// make sure C++ peer connection is garbage collected
// https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155
peerConnection.addEventListener('connectionstatechange', () => {
switch (peerConnection.connectionState) {
case 'closed':
peerConnection.close()
break
default:
break
}
})
const muxerFactory = new DataChannelMuxerFactory({
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
dataChannelOptions: this.init.dataChannel
})
Expand All @@ -229,7 +217,6 @@ export class WebRTCTransport implements Transport<WebRTCDialEvents>, Startable {
})

const webRTCConn = toMultiaddrConnection({
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
remoteAddr: remoteAddress,
metrics: this.metrics?.listenerEvents,
Expand All @@ -246,7 +233,6 @@ export class WebRTCTransport implements Transport<WebRTCDialEvents>, Startable {
})

// close the connection on shut down
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
this._closeOnShutdown(peerConnection, webRTCConn)
} catch (err: any) {
this.log.error('incoming signaling error - %e', err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ export async function connect (peerConnection: RTCPeerConnection | DirectRTCPeer
// Creating the connection before completion of the noise
// handshake ensures that the stream opening callback is set up
const maConn = toMultiaddrConnection({
// @ts-expect-error types are broken
peerConnection,
remoteAddr: options.remoteAddr,
metrics: options.events,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,90 +1,21 @@
import { Crypto } from '@peculiar/webcrypto'
import { PeerConnection } from 'node-datachannel'
import { RTCPeerConnection } from 'node-datachannel/polyfill'
import * as WebRTCNode from '@mertushka/webrtc-node'
import { DEFAULT_ICE_SERVERS, MAX_MESSAGE_SIZE } from '../../constants.ts'
import { DataChannelMuxerFactory } from '../../muxer.ts'
import { generateTransportCertificate } from './generate-certificates.ts'
import type { DataChannelOptions, TransportCertificate } from '../../index.ts'
import type { CounterGroup } from '@libp2p/interface'
import type { CertificateFingerprint } from 'node-datachannel'

const crypto = new Crypto()

interface DirectRTCPeerConnectionInit extends RTCConfiguration {
ufrag: string
peerConnection: PeerConnection
}

export class DirectRTCPeerConnection extends RTCPeerConnection {
private peerConnection: PeerConnection
private readonly ufrag: string

constructor (init: DirectRTCPeerConnectionInit) {
super(init)

this.peerConnection = init.peerConnection
this.ufrag = init.ufrag

// make sure C++ peer connection is garbage collected
// https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155
this.addEventListener('connectionstatechange', () => {
switch (this.connectionState) {
case 'closed':
this.peerConnection.close()
break
default:
break
}
})
}

async createOffer (): Promise<globalThis.RTCSessionDescriptionInit | any> {
// have to set ufrag before creating offer
if (this.connectionState === 'new') {
this.peerConnection?.setLocalDescription('offer', {
iceUfrag: this.ufrag,
icePwd: this.ufrag
})
}

return super.createOffer()
}

async createAnswer (): Promise<globalThis.RTCSessionDescriptionInit | any> {
// have to set ufrag before creating answer
if (this.connectionState === 'new') {
this.peerConnection?.setLocalDescription('answer', {
iceUfrag: this.ufrag,
icePwd: this.ufrag
})
}

return super.createAnswer()
}

remoteFingerprint (): CertificateFingerprint {
if (this.peerConnection == null) {
throw new Error('Invalid state: peer connection not set')
}

return this.peerConnection.remoteFingerprint()
}
}

function mapIceServers (iceServers?: RTCIceServer[]): string[] {
return iceServers
?.map((server) => {
const urls = Array.isArray(server.urls) ? server.urls : [server.urls]

return urls.map((url) => {
if (server.username != null && server.credential != null) {
const [protocol, rest] = url.split(/:(.*)/)
return `${protocol}:${server.username}:${server.credential}@${rest}`
}
return url
})
})
.flat() ?? []
const webRTCNode = (WebRTCNode as unknown as { default?: typeof WebRTCNode }).default ?? WebRTCNode
const {
RTCPeerConnection: NodeRTCPeerConnection,
nonstandard
} = webRTCNode
const RTCPeerConnection = NodeRTCPeerConnection as unknown as typeof globalThis.RTCPeerConnection

export interface DirectRTCPeerConnection extends globalThis.RTCPeerConnection {
remoteFingerprint(): ReturnType<typeof nonstandard.getRemoteFingerprint>
}

export interface CreateDialerRTCPeerConnectionOptions {
Expand Down Expand Up @@ -113,23 +44,32 @@ export async function createDialerRTCPeerConnection (role: 'client' | 'server',
}

const rtcConfig = typeof options.rtcConfiguration === 'function' ? await options.rtcConfiguration() : options.rtcConfiguration
const certificate = nonstandard.importCertificate({
certificatePem: options.certificate.pem,
privateKeyPem: options.certificate.privateKey
})

const peerConnection = new DirectRTCPeerConnection({
...rtcConfig,
ufrag,
peerConnection: new PeerConnection(`${role}-${Date.now()}`, {
disableFingerprintVerification: true,
disableAutoNegotiation: true,
certificatePemFile: options.certificate.pem,
keyPemFile: options.certificate.privateKey,
enableIceUdpMux: role === 'server',
maxMessageSize: MAX_MESSAGE_SIZE,
iceServers: mapIceServers(rtcConfig?.iceServers ?? DEFAULT_ICE_SERVERS.map(urls => ({ urls })))
})
const peerConnection = new RTCPeerConnection({
...(rtcConfig ?? {}),
certificates: [certificate as unknown as globalThis.RTCCertificate],
iceServers: rtcConfig?.iceServers ?? DEFAULT_ICE_SERVERS.map(urls => ({ urls }))
}) as DirectRTCPeerConnection
const nonstandardPeerConnection = peerConnection as unknown as InstanceType<typeof NodeRTCPeerConnection>

nonstandard.configurePeerConnection(nonstandardPeerConnection, {
disableFingerprintVerification: true,
enableIceUdpMux: role === 'server',
maxMessageSize: MAX_MESSAGE_SIZE
})

nonstandard.setLocalIceCredentials(nonstandardPeerConnection, {
iceUfrag: ufrag,
icePwd: ufrag
})

peerConnection.remoteFingerprint = () => nonstandard.getRemoteFingerprint(nonstandardPeerConnection)

const muxerFactory = new DataChannelMuxerFactory({
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
metrics: options.events,
dataChannelOptions: options.dataChannel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { isIPv4 } from '@chainsafe/is-ip'
import { IceUdpMuxListener } from 'node-datachannel'
import * as WebRTCNode from '@mertushka/webrtc-node'
import type { Logger } from '@libp2p/interface'
import type { AddressInfo } from 'node:net'

const webRTCNode = (WebRTCNode as unknown as { default?: typeof WebRTCNode }).default ?? WebRTCNode
const { nonstandard } = webRTCNode

export interface StunServer {
close(): Promise<void>
address(): AddressInfo
Expand All @@ -13,7 +16,7 @@ export interface Callback {
}

export async function stunListener (host: string, port: number, log: Logger, cb: Callback): Promise<StunServer> {
const listener = new IceUdpMuxListener(port, host)
const listener = new nonstandard.IceUdpMuxListener(port, host)
listener.onUnhandledStunRequest(request => {
if (request.ufrag == null) {
return
Expand Down
6 changes: 3 additions & 3 deletions packages/transport-webrtc/src/rtcpeerconnection-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class RTCPeerConnectionMultiaddrConnection extends AbstractMultiaddrConnection {
// nothing else to do but close the connection
this.onTransportClosed()

// only necessary with node-datachannel
// https://github.com/murat-dogan/node-datachannel/issues/366#issuecomment-3228453155
this.peerConnection.close()
if (this.peerConnection.connectionState === 'failed') {
this.peerConnection.close()
}
}
}
}
Expand Down
12 changes: 3 additions & 9 deletions packages/transport-webrtc/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,9 @@ export class WebRTCStream extends AbstractStream {

this.log.trace('sending message, channel state "%s"', this.channel.readyState)

try {
// send message without copying data
for (const buf of data) {
this.channel.send(withArrayBuffer(buf))
}
} catch (err: any) {
// channel.send can throw synchronously if the polyfill's cached readyState is stale
this.log.error('error sending datachannel message - %e', err)
this.abort(err)
// send message without copying data
for (const buf of data) {
this.channel.send(withArrayBuffer(buf))
}
}

Expand Down
3 changes: 1 addition & 2 deletions packages/transport-webrtc/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import pTimeout from 'p-timeout'
import { DATA_CHANNEL_DRAIN_TIMEOUT, DEFAULT_ICE_SERVERS, UFRAG_ALPHABET, UFRAG_PREFIX } from './constants.ts'
import type { LoggerOptions } from '@libp2p/interface'
import type { Duplex, Source } from 'it-stream-types'
import type { PeerConnection } from 'node-datachannel'

export const nopSource = async function * nop (): AsyncGenerator<Uint8Array, any, unknown> {}

Expand Down Expand Up @@ -82,7 +81,7 @@ export interface AbortPromiseOptions {
message?: string
}

export function isPeerConnection (obj: any): obj is PeerConnection {
export function isPeerConnection (obj: any): obj is { state(): RTCPeerConnectionState } {
return typeof obj.state === 'function'
}

Expand Down
13 changes: 12 additions & 1 deletion packages/transport-webrtc/src/webrtc/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,12 @@
export { RTCSessionDescription, RTCIceCandidate, RTCPeerConnection } from 'node-datachannel/polyfill'
import * as WebRTCNode from '@mertushka/webrtc-node'

const webRTCNode = (WebRTCNode as unknown as { default?: typeof WebRTCNode }).default ?? WebRTCNode
const {
RTCIceCandidate: NodeRTCIceCandidate,
RTCPeerConnection: NodeRTCPeerConnection,
RTCSessionDescription: NodeRTCSessionDescription
} = webRTCNode

export const RTCSessionDescription = NodeRTCSessionDescription as unknown as typeof globalThis.RTCSessionDescription
export const RTCIceCandidate = NodeRTCIceCandidate as unknown as typeof globalThis.RTCIceCandidate
export const RTCPeerConnection = NodeRTCPeerConnection as unknown as typeof globalThis.RTCPeerConnection
1 change: 0 additions & 1 deletion packages/transport-webrtc/test/maconn.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ describe('Multiaddr Connection', () => {
reset: () => {}
})
const maConn = toMultiaddrConnection({
// @ts-expect-error https://github.com/murat-dogan/node-datachannel/pull/370
peerConnection,
remoteAddr,
metrics,
Expand Down
18 changes: 13 additions & 5 deletions packages/transport-webrtc/test/peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ interface PrivateToPrivateComponents {
recipient: Recipient
}

function closePeerConnection (peerConnection?: RTCPeerConnection): void {
if (peerConnection == null || peerConnection.connectionState === 'closed') {
return
}

peerConnection.close()
}

async function getComponents (): Promise<PrivateToPrivateComponents> {
const relayPeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const initiatorPeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
Expand Down Expand Up @@ -82,8 +90,8 @@ describe('webrtc basic', () => {
let initiatorPeerConnection: RTCPeerConnection

afterEach(() => {
initiatorPeerConnection?.close()
recipient?.peerConnection?.close()
closePeerConnection(initiatorPeerConnection)
closePeerConnection(recipient?.peerConnection)
})

it('should connect', async () => {
Expand Down Expand Up @@ -146,7 +154,7 @@ describe('webrtc receiver', () => {
let recipient: Recipient

afterEach(() => {
recipient?.peerConnection?.close()
closePeerConnection(recipient?.peerConnection)
})

it('should fail receiving on invalid sdp offer', async () => {
Expand All @@ -164,7 +172,7 @@ describe('webrtc dialer', () => {
let recipient: Recipient

afterEach(() => {
recipient?.peerConnection?.close()
closePeerConnection(recipient?.peerConnection)
})

it('should fail receiving on invalid sdp answer', async () => {
Expand Down Expand Up @@ -218,7 +226,7 @@ describe('webrtc dialer', () => {

await expect(initiatorPeerConnectionPromise).to.be.rejectedWith(/Remote should send an SDP answer/)

pc.close()
closePeerConnection(pc)
})
})

Expand Down
Loading