diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..076a30cb --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "editor.formatOnSave": false, + "typescript.format.insertSpaceBeforeFunctionParenthesis": true, + "typescript.format.insertSpaceAfterConstructor": true, + "javascript.format.insertSpaceBeforeFunctionParenthesis": true, + "javascript.format.insertSpaceAfterConstructor": true +} diff --git a/README.md b/README.md index 6a20dc8a..c712f6a4 100644 --- a/README.md +++ b/README.md @@ -279,7 +279,9 @@ If `opts` is specified, then the default options (shown below) will be overridde trickle: true, allowHalfTrickle: false, wrtc: {}, // RTCPeerConnection/RTCSessionDescription/RTCIceCandidate - objectMode: false + objectMode: false, + iceRestartEnabled: "onFailure", + iceFailureRecoveryTimeout: 5000 } ``` @@ -302,6 +304,13 @@ The options do the following: - `objectMode` - set to `true` to create the stream in [Object Mode](https://nodejs.org/api/stream.html#stream_object_mode). In this mode, incoming string data is not automatically converted to `Buffer` objects. +- `iceRestartEnabled` - attempt to automatically reconnect if the network path between peers changes. Warning: may crash firefox versions <= 47 + - Set to "onDisconnect" to attempt reconnection when ICE state changes to disconnected (connection dropped temporarily and may not need restart) + - Set to "onFailure" to trigger reconnection when ICE state changes to failed (connection path fully broken, reconnection required). + - Set to `false` to disable automatic ICE restart - ICE restart can still be triggered manually using `peer.restartIce()` + +- `iceFailureRecoveryTimeout` - miliseconds to wait for ICE restart to complete after the ICE state reaches "failed". + ### `peer.signal(data)` Call this method whenever the remote peer emits a `peer.on('signal')` event. @@ -432,6 +441,12 @@ peer.on('stream', stream => { Received a remote audio/video track. Streams may contain multiple tracks. +### `peer.on('reconnect', () => {})` + +Fired when the peer connection has been re-established after a temporary disconnection due to network changes (ICE Restart) or, in some cases, successful renegotiation. + +The connection is ready to use. The local and remote network canidates & IP addresses may have changed. + ### `peer.on('close', () => {})` Called when the peer connection has closed. diff --git a/full.js b/full.js index 7d7e52ce..9925bdbe 100644 --- a/full.js +++ b/full.js @@ -5,11 +5,17 @@ import { Duplex } from 'streamx' import errCode from 'err-code' import { randomBytes, arr2hex, text2arr } from 'uint8-util' +/** Type Definitions + * @typedef {import('./index.js').SimplePeerOptions} SimplePeerOptions + */ + const Debug = debug('simple-peer') const MAX_BUFFERED_AMOUNT = 64 * 1024 const ICECOMPLETE_TIMEOUT = 5 * 1000 const CHANNEL_CLOSING_TIMEOUT = 5 * 1000 +const ICEFAILURE_RECOVERY_TIMEOUT = 5 * 1000 +const ICE_UFRAG_STRING = 'a=ice-ufrag:' // HACK: Filter trickle lines when trickle is disabled #354 function filterTrickle (sdp) { @@ -23,11 +29,16 @@ function warn (message) { /** * WebRTC peer connection. Same API as node core `net.Socket`, plus a few extra methods. * Duplex stream. - * @param {Object} opts + * @param {SimplePeerOptions} opts */ class Peer extends Duplex { + /** @type {RTCPeerConnection} */ _pc + + /** Create a new Simple Peer instance. + * @param {SimplePeerOptions} opts + */ constructor (opts) { opts = Object.assign({ allowHalfOpen: false @@ -56,8 +67,14 @@ class Peer extends Duplex { this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT + // What to do when the connection has dropped and will not recover (ICE reaches failed state) - must be an async function or a function that returns a promise. + this.recoveryAction = opts.recoveryAction ?? this._defaultRecoveryAction + this._destroying = false this._connected = false + this._connecting = false + this._connectedOnce = false + this._iceUFrag = undefined this.remoteAddress = undefined this.remoteFamily = undefined @@ -74,7 +91,6 @@ class Peer extends Duplex { } } - this._pcReady = false this._channelReady = false this._iceComplete = false // ice candidate trickle done (got null candidate) this._iceCompleteTimer = null // send an offer/answer anyway after some timeout @@ -82,6 +98,9 @@ class Peer extends Duplex { this._pendingCandidates = [] this._isNegotiating = false // is this peer waiting for negotiation to complete? + this._shouldRestartIce = false // should ice be restarted on the next negotiation? + this._isRestartingIce = false // is true while restarting ice and false once connected (only on initiator side) + this._isRecoveringIceFailure = false // is true while recovering from ice failure (the promise returned by recoveryAction is pending) this._firstNegotiation = true this._batchedNegotiation = false // batch synchronous negotiations this._queuedNegotiation = false // is there a queued negotiation request? @@ -215,6 +234,19 @@ class Peer extends Duplex { }) this._pendingCandidates = [] + // Detect a remote ice restart: Ice restart will cause incoming ice candidates to use a new username fragment (ufrag) + const sdp = this._pc.remoteDescription.sdp; + const iceuFragSdpStartPos = sdp.indexOf(ICE_UFRAG_STRING) + if (iceuFragSdpStartPos !== -1) { + const uFragStart = iceuFragSdpStartPos + ICE_UFRAG_STRING.length + const uFragEnd = sdp.indexOf('\n', uFragStart) + const uFragStr = sdp.slice(uFragStart, uFragEnd > 0 ? uFragEnd : undefined).trim() + this._debug('Remote ICE username fragment (from sdp): ' + uFragStr) + this._updateUFrag(uFragStr) + } else { + this._debug('No remote ICE username fragment found in sdp:', sdp) + } + if (this._pc.remoteDescription.type === 'offer') this._createAnswer() }) .catch(err => { @@ -228,6 +260,8 @@ class Peer extends Duplex { _addIceCandidate (candidate) { const iceCandidateObj = new RTCIceCandidate(candidate) + + // Add the candidate to the remote peer this._pc.addIceCandidate(iceCandidateObj) .catch(err => { if (!iceCandidateObj.address || iceCandidateObj.address.endsWith('.local')) { @@ -423,6 +457,115 @@ class Peer extends Duplex { this._isNegotiating = true } + _updateUFrag (uFrag) { + if (uFrag && uFrag !== this._iceUFrag) { + if (this._iceUFrag !== undefined) { + this._iceComplete = false // Reset iceComplete + this._isRestartingIce = true + this.emit("restartingIce", true) + this._debug('Remote ICE username fragment changed: ' + this._iceUFrag + "->" + uFrag) + } + this._iceUFrag = uFrag + } + } + + /** + * Trigger an ICE Restart on the WebRTC connection. + * This will re-gather network candidates and find the best network route between peers. + * Useful for re-establishing connection or improving latency between peers when networks change. + * ICE Restarts should not cause media or data pauses, unless the connection cannot be re-established. + * @warn Ice restarts are only allowed on the initiator peer! + * @returns {boolean} - Returns true if ice restart was initiated successfully; false if conditions aren't met for ice restart (not the initiator, already restarting ice, destroyed(ing) peer). + */ + restartIce () { + if (this.destroyed || this._destroying) return false; + if (!this.initiator) { + this._debug('restartIce() only works for the initiator') + return false; + } if (this._isRestartingIce) { + this._debug('Already restarting ice, ignoring restartIce()') + return false; + } else { + this._debug('Restarting ICE') + // if (this._iceFailureRecoveryTimer != null) { + // // Restart the recovery timer when restartIce() is manually called, + // // Note: this._iceFailureRecoveryTimer being non-null indicates that ice has previously entered the failed state and has not recovered by now. + // clearTimeout(this._iceFailureRecoveryTimer) + // this._iceFailureRecoveryTimer = null; + // this._startIceFailureRecoveryTimeout() + // } + this._iceComplete = false // Reset iceComplete + clearTimeout(this._iceCompleteTimer) // Clear _iceCompleteTimer too + this._iceCompleteTimer = null // Clear _iceCompleteTimer too + this._isNegotiating = false // allow renegotiation and createOffer to happen + this._isRestartingIce = true; + this.emit("restartingIce", true) + if (this._pc.restartIce) this._pc.restartIce() + this._shouldRestartIce = (this.initiator == true); + this._needsNegotiation() // Start a new negotiating cycle + return true + } + } + + /** returns true when an ice restart has started and not completed */ + isIceRestartInProgress () { + return this._isRestartingIce; + } + + /** + * calls the configured recoveryAction function or promise, waits for it to complete, + * and then checks if the connection is re-estabilshed and otherwise destroys the peer + * @param {string} errorCode - error code to be used in case of failure eg: 'ERR_ICE_CONNECTION_FAILURE' + **/ + _onConnectionFailure (errorCode) { + if (this.destroyed || this._destroying) return + const iceConnectionState = this._pc.iceConnectionState + const iceGatheringState = this._pc.iceGatheringState + this._debug('onIceFailure', iceConnectionState, iceGatheringState, this._iceComplete) + if (this._isRecoveringIceFailure) return + if (this.recoveryAction && typeof this.recoveryAction == 'function') { + this._debug('Attempting recovery...') + this._isRecoveringIceFailure = true + this.recoveryAction(this).then(() => { + this._debug('Recovery finished') + this._isRecoveringIceFailure = false + this._maybeReady() + }).catch((err) => { + this._isRecoveringIceFailure = false + this._debug('Recovery failed: ' + err + errCode) + this.__destroy(errCode(err, errorCode)) + }) + } else { + this.__destroy(errCode(new Error('Recovery action not set.'), errorCode)) + } + } + + /** + * Default action to run when an ice connection failure is detected. + * This function will attempt to restart ice and wait for the connection to re-establish. + * If the connection is not re-established within a timeout, the promise rejects and the peer will be destroyed. + * @param {Peer} thisPeer - The peer the recovery action is being called on + */ + async _defaultRecoveryAction (thisPeer) { + if (!thisPeer.trickle) thisPeer.__destroy(errCode(new Error('Connection Failed - Not doing recovery b/c trickle disabled.'), 'ERR_ICE_CONNECTION_FAILURE')) + await new Promise((resolve, reject) => { + if(thisPeer.initiator) thisPeer.restartIce() + const onIceStateChange = (iceConnectionState, iceGatheringState) => { + if (iceConnectionState === 'connected' || iceConnectionState === 'completed') { + resolve() + } else if (iceConnectionState === 'failed') { + reject(new Error('Recovery Ice Failed')) + } else return; + thisPeer.off('iceStateChange', onIceStateChange) + } + thisPeer.on('iceStateChange', onIceStateChange) + setTimeout(() => { + thisPeer.off('iceStateChange', onIceStateChange) + reject(new Error('Recovery Timeout')) + }, ICEFAILURE_RECOVERY_TIMEOUT) // Arbitrary timeout, can be adjusted by supplying your own recoveryAction function or promise + }) + } + _final (cb) { if (!this._readableState.ended) this.push(null) cb(null) @@ -430,7 +573,7 @@ class Peer extends Duplex { __destroy (err) { this.end() - this._destroy(() => {}, err) + this._destroy(() => { }, err) } _destroy (cb, err) { @@ -441,7 +584,6 @@ class Peer extends Duplex { setTimeout(() => { // allow events concurrent with the call to _destroy() to fire (see #692) this._connected = false - this._pcReady = false this._channelReady = false this._remoteTracks = null this._remoteStreams = null @@ -560,8 +702,8 @@ class Peer extends Duplex { } } - // When stream finishes writing, close socket. Half open connections are not - // supported. + /** When stream finishes writing, close socket. Half open connections are not + supported. */ _onFinish () { if (this.destroyed) return @@ -584,10 +726,9 @@ class Peer extends Duplex { this._debug('started iceComplete timeout') this._iceCompleteTimer = setTimeout(() => { if (!this._iceComplete) { - this._iceComplete = true this._debug('iceComplete timeout completed') this.emit('iceTimeout') - this.emit('_iceComplete') + this._onIceComplete(); } }, this.iceCompleteTimeout) } @@ -595,7 +736,12 @@ class Peer extends Duplex { _createOffer () { if (this.destroyed) return - this._pc.createOffer(this.offerOptions) + const offerOptions = Object.assign({ + iceRestart: this._shouldRestartIce + }, this.offerOptions) // copy offer options + this._shouldRestartIce = false + + this._pc.createOffer(offerOptions) .then(offer => { if (this.destroyed) return if (!this.trickle && !this.allowHalfTrickle) offer.sdp = filterTrickle(offer.sdp) @@ -683,8 +829,14 @@ class Peer extends Duplex { _onConnectionStateChange () { if (this.destroyed || this._destroying) return - if (this._pc.connectionState === 'failed') { - this.__destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE')) + this._debug('_onConnectionStateChange ' + this._pc.connectionState) + + if (this._pc.connectionState !== "connected") { + this._connected = false + } + + if (this._pc.connectionState === 'failed' && this._pc.iceConnectionState === 'failed') { + this._onConnectionFailure('ERR_CONNECTION_FAILURE') } } @@ -700,18 +852,35 @@ class Peer extends Duplex { ) this.emit('iceStateChange', iceConnectionState, iceGatheringState) + if (iceGatheringState === 'complete') { + this._onIceComplete(); + } + if (iceConnectionState === 'connected' || iceConnectionState === 'completed') { - this._pcReady = true + this._isRestartingIce = false + this._shouldRestartIce = false this._maybeReady() } - if (iceConnectionState === 'failed') { - this.__destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE')) + else if (iceConnectionState === 'failed') { + this._onConnectionFailure('ERR_ICE_CONNECTION_FAILURE') } - if (iceConnectionState === 'closed') { + else if (iceConnectionState === 'closed') { this.__destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED')) } } + _onIceComplete () { + if (!this._iceComplete) { + this._iceComplete = true + this._isRestartingIce = false + this._shouldRestartIce = false + this._debug('iceComplete') + this.emit('_iceComplete') + clearTimeout(this._iceCompleteTimer) + this._iceCompleteTimer = null + } + } + getStats (cb) { // statreports can come with a value array instead of properties const flattenValues = report => { @@ -762,9 +931,10 @@ class Peer extends Duplex { } _maybeReady () { - this._debug('maybeReady pc %s channel %s', this._pcReady, this._channelReady) - if (this._connected || this._connecting || !this._pcReady || !this._channelReady) return - + const iceConnectionState = this._pc.iceConnectionState + const pcReady = iceConnectionState === 'connected' || iceConnectionState === 'completed' + this._debug('maybeReady pc %s channel %s', pcReady, this._channelReady) + if (this._connecting || !pcReady || !this._channelReady) return this._connecting = true // HACK: We can't rely on order here, for details see https://github.com/js-platform/node-webrtc/issues/339 @@ -887,15 +1057,22 @@ class Peer extends Duplex { cb(null) } - // If `bufferedAmountLowThreshold` and 'onbufferedamountlow' are unsupported, - // fallback to using setInterval to implement backpressure. - if (typeof this._channel.bufferedAmountLowThreshold !== 'number') { - this._interval = setInterval(() => this._onInterval(), 150) - if (this._interval.unref) this._interval.unref() - } + if (!this._connectedOnce) { + this._connectedOnce = true + + // If `bufferedAmountLowThreshold` and 'onbufferedamountlow' are unsupported, + // fallback to using setInterval to implement backpressure. + if (typeof this._channel.bufferedAmountLowThreshold !== 'number') { + this._interval = setInterval(() => this._onInterval(), 150) + if (this._interval.unref) this._interval.unref() + } - this._debug('connect') - this.emit('connect') + this._debug('connect') + this.emit('connect') + } else { + this._debug('reconnect') + this.emit('reconnect') + } }) } findCandidatePair() @@ -948,8 +1125,8 @@ class Peer extends Duplex { } }) } else if (!event.candidate && !this._iceComplete) { - this._iceComplete = true - this.emit('_iceComplete') + // a null ICE candidate indicates that the ice gathering process is finished + this._onIceComplete() } // as soon as we've received one valid candidate start timeout if (event.candidate) { diff --git a/index.js b/index.js index 0a1aa0c7..f8804304 100644 --- a/index.js +++ b/index.js @@ -2,12 +2,23 @@ import Lite from './lite.js' import errCode from 'err-code' +/** Type Definitions + * @typedef {import('./lite.js').SimplePeerLiteOptions & { + * stream?: false | MediaStream; + * streams?: MediaStream[]; + * }} SimplePeerOptions + */ + /** * WebRTC peer connection. Same API as node core `net.Socket`, plus a few extra methods. * Duplex stream. - * @param {Object} opts + * @param {SimplePeerOptions} opts */ class Peer extends Lite { + + /** Create a new Simple Peer instance. + * @param {SimplePeerOptions} opts + */ constructor (opts = {}) { super(opts) if (!this._pc) return diff --git a/lite.js b/lite.js index d8b15b8c..f11b4f4a 100644 --- a/lite.js +++ b/lite.js @@ -5,11 +5,31 @@ import { Duplex } from 'streamx' import errCode from 'err-code' import { randomBytes, arr2hex, text2arr } from 'uint8-util' +/** Type Definitions + * Simple Peer Lite Options: + * @typedef {{ + * initiator: boolean; + * channelName?: string; + * channelConfig?: RTCDataChannelInit; + * config?: RTCConfiguration; + * offerOptions?: RTCOfferOptions; + * answerOptions?: RTCAnswerOptions; + * sdpTransform?: (string) => string; + * wrtc?: { RTCPeerConnection: function, RTCSessionDescription: function, RTCIceCandidate: function }; + * trickle?: boolean; + * allowHalfTrickle?: boolean; + * objectMode?: boolean; + * iceRestartEnabled?: false | "onFailure" | "onDisconnect"; + * iceFailureRecoveryTimeout?: number; //miliseconds to wait for ice restart to complete after the ice state reaches "failed". + * }} SimplePeerLiteOptions + */ + const Debug = debug('simple-peer') const MAX_BUFFERED_AMOUNT = 64 * 1024 const ICECOMPLETE_TIMEOUT = 5 * 1000 const CHANNEL_CLOSING_TIMEOUT = 5 * 1000 +const ICEFAILURE_RECOVERY_TIMEOUT = 5 * 1000 // HACK: Filter trickle lines when trickle is disabled #354 function filterTrickle (sdp) { @@ -23,11 +43,16 @@ function warn (message) { /** * WebRTC peer connection. Same API as node core `net.Socket`, plus a few extra methods. * Duplex stream. - * @param {Object} opts + * @param {SimplePeerOptions} opts */ class Peer extends Duplex { + /** @type {RTCPeerConnection} */ _pc + + /** Create a new Simple Peer instance. + * @param {SimplePeerOptions} opts + */ constructor (opts) { opts = Object.assign({ allowHalfOpen: false @@ -55,8 +80,16 @@ class Peer extends Duplex { this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT + // Ice restart often only makes sense if trickle is enabled, and isn't currently supported in wrtc node polyfill https://github.com/feross/simple-peer/issues/579 + this.iceRestartEnabled = opts.iceRestartEnabled ?? ((this.trickle === true && !opts.wrtc) ? "onFailure" : false) + if (this.iceRestartEnabled === true) this.iceRestartEnabled = "onFailure" // default to "onFailure" if user mistakenly passes true instead of a string + this.iceFailureRecoveryTimeout = opts.iceFailureRecoveryTimeout ?? ICEFAILURE_RECOVERY_TIMEOUT // how long to wait for recovery from failed state + this._iceFailureRecoveryTimer = null + this._destroying = false this._connected = false + this._connecting = false + this._connectedOnce = false this.remoteAddress = undefined this.remoteFamily = undefined @@ -75,12 +108,13 @@ class Peer extends Duplex { this._pcReady = false this._channelReady = false - this._iceComplete = false // ice candidate trickle done (got null candidate) - this._iceCompleteTimer = null // send an offer/answer anyway after some timeout + this._iceGatheringComplete = false // ice candidate trickle done (got null candidate) + this._iceGatheringCompleteTimer = null // send an offer/answer anyway after some timeout this._channel = null this._pendingCandidates = [] this._isNegotiating = false // is this peer waiting for negotiation to complete? + this._isRestartingIce = false // is true while restarting ice and false once connected (only on initiator side) this._firstNegotiation = true this._batchedNegotiation = false // batch synchronous negotiations this._queuedNegotiation = false // is there a queued negotiation request? @@ -282,6 +316,66 @@ class Peer extends Duplex { this._isNegotiating = true } + /** + * Trigger an ICE Restart on the WebRTC connection. + * This will re-gather network candidates and find the best network route between peers. + * Useful for re-establishing connection or improving latency between peers when networks change. + * ICE Restarts should not cause media or data pauses, unless the connection cannot be re-established. + * @warn Ice restarts are only allowed on the initiator peer! + * @returns {boolean} - Returns true if ice restart was initiated successfully; false if conditions aren't met for ice restart (not the initiator, already restarting ice, destroyed(ing) peer). + */ + restartIce () { + if (this.destroyed || this._destroying) return false; + if (!this.initiator) { + this._debug('restartIce() only works for the initiator') + return false; + } if (this._isRestartingIce) { + this._debug('Already restarting ice, ignoring restartIce()') + return false; + } else { + this._debug('Restarting ICE') + if (this._iceFailureRecoveryTimer != null) { + // Restart the recovery timer when restartIce() is manually called, + // Note: this._iceFailureRecoveryTimer being non-null indicates that ice has previously entered the failed state and has not recovered by now. + clearTimeout(this._iceFailureRecoveryTimer) + this._iceFailureRecoveryTimer = null; + this._startIceFailureRecoveryTimeout() + } + this._iceGatheringComplete = false // Reset iceComplete + clearTimeout(this._iceGatheringCompleteTimer) // Clear _iceGatheringCompleteTimer too + this._iceGatheringCompleteTimer = null // Clear _iceGatheringCompleteTimer too + this._isNegotiating = false // allow renegotiation and createOffer to happen + this._isRestartingIce = true; + if (this._pc.restartIce) this._pc.restartIce() + this._needsNegotiation() // Start a new negotiating cycle + return true + } + } + + /** + * calls __destroy() after the iceFailureRecoveryTimeout time if we dont + * re-establish connection (this function is called once ice enters the failed state) + **/ + _startIceFailureRecoveryTimeout () { + if (this.destroyed || this._destroying) return + if (this._iceFailureRecoveryTimer != null) return + this._debug('started iceFailureRecovery timeout') + this._iceGatheringComplete = false // Reset iceComplete + clearTimeout(this._iceGatheringCompleteTimer) // Clear _iceGatheringCompleteTimer too + this._iceGatheringCompleteTimer = null // Clear _iceGatheringCompleteTimer too + this._iceFailureRecoveryTimer = setTimeout(() => { + const iceConnectionState = this._pc.iceConnectionState + const iceGatheringState = this._pc.iceGatheringState + this._debug('checking iceFailureRecovery timeout', iceConnectionState, iceGatheringState, this._iceGatheringComplete) + let hasFailedToRecover = !(iceConnectionState === 'connected' || iceConnectionState === 'completed') + if (hasFailedToRecover) { + this._debug('iceFailureRecovery timeout completed - failed') + this.__destroy(errCode(new Error('Ice connection recovery failed.'), 'ERR_ICE_CONNECTION_FAILURE')) + } + }, this.iceFailureRecoveryTimeout) + } + + _final (cb) { if (!this._readableState.ended) this.push(null) cb(null) @@ -289,7 +383,7 @@ class Peer extends Duplex { __destroy (err) { this.end() - this._destroy(() => {}, err) + this._destroy(() => { }, err) } _destroy (cb, err) { @@ -320,7 +414,7 @@ class Peer extends Duplex { if (this._channel) { try { this._channel.close() - } catch (err) {} + } catch (err) { } // allow events concurrent with destruction to be handled this._channel.onmessage = null @@ -331,7 +425,7 @@ class Peer extends Duplex { if (this._pc) { try { this._pc.close() - } catch (err) {} + } catch (err) { } // allow events concurrent with destruction to be handled this._pc.oniceconnectionstatechange = null @@ -419,8 +513,8 @@ class Peer extends Duplex { } } - // When stream finishes writing, close socket. Half open connections are not - // supported. + /** When stream finishes writing, close socket. Half open connections are not + supported. */ _onFinish () { if (this.destroyed) return @@ -439,14 +533,13 @@ class Peer extends Duplex { _startIceCompleteTimeout () { if (this.destroyed) return - if (this._iceCompleteTimer) return + if (this._iceGatheringCompleteTimer) return this._debug('started iceComplete timeout') - this._iceCompleteTimer = setTimeout(() => { - if (!this._iceComplete) { - this._iceComplete = true + this._iceGatheringCompleteTimer = setTimeout(() => { + if (!this._iceGatheringComplete) { this._debug('iceComplete timeout completed') this.emit('iceTimeout') - this.emit('_iceComplete') + this._onIceGatheringComplete(); } }, this.iceCompleteTimeout) } @@ -454,7 +547,10 @@ class Peer extends Duplex { _createOffer () { if (this.destroyed) return - this._pc.createOffer(this.offerOptions) + const offerOptions = Object.assign({}, this.offerOptions) // copy offer options + if (this._isRestartingIce) this.offerOptions.iceRestart = true + + this._pc.createOffer(offerOptions) .then(offer => { if (this.destroyed) return if (!this.trickle && !this.allowHalfTrickle) offer.sdp = filterTrickle(offer.sdp) @@ -473,8 +569,8 @@ class Peer extends Duplex { const onSuccess = () => { this._debug('createOffer success') if (this.destroyed) return - if (this.trickle || this._iceComplete) sendOffer() - else this.once('_iceComplete', sendOffer) // wait for candidates + if (this.trickle || this._iceGatheringComplete) sendOffer() + else this.once('_iceGatheringComplete', sendOffer) // wait for candidates } const onError = err => { @@ -512,8 +608,8 @@ class Peer extends Duplex { const onSuccess = () => { if (this.destroyed) return - if (this.trickle || this._iceComplete) sendAnswer() - else this.once('_iceComplete', sendAnswer) + if (this.trickle || this._iceGatheringComplete) sendAnswer() + else this.once('_iceGatheringComplete', sendAnswer) } const onError = err => { @@ -531,8 +627,27 @@ class Peer extends Duplex { _onConnectionStateChange () { if (this.destroyed || this._destroying) return + this._debug('_onConnectionStateChange ' + this._pc.connectionState) + + if (this._pc.connectionState !== "connected") { + this._connected = false + } + if (this._pc.connectionState === 'failed') { - this.__destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE')) + if (this.iceRestartEnabled || this._isRestartingIce) { + this._startIceFailureRecoveryTimeout() + } else { + return this.__destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE')) + } + } + + if ( + (this._pc.connectionState === 'failed' && this.iceRestartEnabled === 'onFailure') || + (this._pc.connectionState === 'disconnected' && this.iceRestartEnabled === 'onDisconnect') + ) { + if (this.initiator && !this._isRestartingIce) { + this.restartIce() + } } } @@ -548,15 +663,45 @@ class Peer extends Duplex { ) this.emit('iceStateChange', iceConnectionState, iceGatheringState) + if (iceGatheringState === 'complete') { + this._onIceGatheringComplete(); + } + if (iceConnectionState === 'connected' || iceConnectionState === 'completed') { + if (this._iceFailureRecoveryTimer != null) { + clearTimeout(this._iceFailureRecoveryTimer) + this._iceFailureRecoveryTimer = null + } this._pcReady = true + this._isRestartingIce = false this._maybeReady() } - if (iceConnectionState === 'failed') { - this.__destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE')) - } + if (iceConnectionState === 'closed') { this.__destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED')) + } else if (iceConnectionState === 'failed' && !this.iceRestartEnabled && !this._isRestartingIce) { + this.__destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE')) + } else if (iceConnectionState === 'failed' && (this.iceRestartEnabled || this._isRestartingIce)) { + this._startIceFailureRecoveryTimeout() + } + + if ( + (iceConnectionState === 'failed' && this.iceRestartEnabled === 'onFailure') || + (iceConnectionState === 'disconnected' && this.iceRestartEnabled === 'onDisconnect') + ) { + if (this.initiator && !this._isRestartingIce) { + this.restartIce() + } + } + } + + _onIceGatheringComplete () { + if (!this._iceGatheringComplete) { + this._debug('iceGatheringComplete') + this._iceGatheringComplete = true + this.emit('_iceGatheringComplete') + clearTimeout(this._iceGatheringCompleteTimer) + this._iceGatheringCompleteTimer = null } } @@ -582,7 +727,7 @@ class Peer extends Duplex { cb(null, reports) }, err => cb(err)) - // Single-parameter callback-based getStats() (non-standard) + // Single-parameter callback-based getStats() (non-standard) } else if (this._pc.getStats.length > 0) { this._pc.getStats(res => { // If we destroy connection in `connect` callback this code might happen to run when actual connection is already closed @@ -602,8 +747,8 @@ class Peer extends Duplex { cb(null, reports) }, err => cb(err)) - // Unknown browser, skip getStats() since it's anyone's guess which style of - // getStats() they implement. + // Unknown browser, skip getStats() since it's anyone's guess which style of + // getStats() they implement. } else { cb(null, []) } @@ -611,7 +756,7 @@ class Peer extends Duplex { _maybeReady () { this._debug('maybeReady pc %s channel %s', this._pcReady, this._channelReady) - if (this._connected || this._connecting || !this._pcReady || !this._channelReady) return + if (this._connecting || !this._pcReady || !this._channelReady) return this._connecting = true @@ -735,15 +880,22 @@ class Peer extends Duplex { cb(null) } - // If `bufferedAmountLowThreshold` and 'onbufferedamountlow' are unsupported, - // fallback to using setInterval to implement backpressure. - if (typeof this._channel.bufferedAmountLowThreshold !== 'number') { - this._interval = setInterval(() => this._onInterval(), 150) - if (this._interval.unref) this._interval.unref() - } + if (!this._connectedOnce) { + this._connectedOnce = true + + // If `bufferedAmountLowThreshold` and 'onbufferedamountlow' are unsupported, + // fallback to using setInterval to implement backpressure. + if (typeof this._channel.bufferedAmountLowThreshold !== 'number') { + this._interval = setInterval(() => this._onInterval(), 150) + if (this._interval.unref) this._interval.unref() + } - this._debug('connect') - this.emit('connect') + this._debug('connect') + this.emit('connect') + } else { + this._debug('reconnect') + this.emit('reconnect') + } }) } findCandidatePair() @@ -795,9 +947,9 @@ class Peer extends Duplex { sdpMid: event.candidate.sdpMid } }) - } else if (!event.candidate && !this._iceComplete) { - this._iceComplete = true - this.emit('_iceComplete') + } else if (!event.candidate && !this._iceGatheringComplete) { + // a null ICE candidate indicates that the ice gathering process is finished + this._onIceGatheringComplete() } // as soon as we've received one valid candidate start timeout if (event.candidate) { diff --git a/test/negotiation.js b/test/negotiation.js index 680248e4..ed75b300 100644 --- a/test/negotiation.js +++ b/test/negotiation.js @@ -192,3 +192,102 @@ test('negotiated channels', function (t) { t.pass('peer2 connect') }) }) + +test('ice restart causes renegotiation', function (t) { + if (!process.browser) return t.end() + t.plan(8) + t.timeoutAfter(20000) + + const peer1 = new Peer({ initiator: true, iceRestartEnabled: "onDisconnect" }) + const peer2 = new Peer({ iceRestartEnabled: "onDisconnect" }) + + // peer1._debug = (...args) => { console.log('peer1 ' + args.shift(), ...args) } + // peer2._debug = (...args) => { console.log('peer2 ' + args.shift(), ...args) } + + peer1.on('signal', function (data) { + if (!peer2.destroyed) peer2.signal(data) + }) + peer2.on('signal', function (data) { + if (!peer1.destroyed) peer1.signal(data) + }) + + peer1.once('connect', tryTest) + peer2.once('connect', tryTest) + + function tryTest () { + if (!peer1.connected || !peer2.connected) return + + peer1.restartIce(); + setTimeout(() => { + t.equal(peer1.restartIce(), false, 'peer1 calling restartIce() again in quick succession should return false because we are already restarting ice') + }, 0) // test that calling restartIce multiple times doesn't break anything + t.equal(peer2.restartIce(), false, 'peer2 restartIce should return false because peer2 is not the initiator') + + peer1.once('reconnect', function () { + t.pass('peer1 reconnect after ice restart') + }) + + peer2.once('reconnect', function () { + t.pass('peer2 reconnect after ice restart') + }) + + peer1.once('connect', function () { + t.fail('peer1 connect event after ice restart, should be reconnect') + }) + + peer2.once('connect', function () { + t.fail('peer2 connect event after ice restart, should be reconnect') + }) + + function onPeer1SignalState (state) { + if (state === 'stable') { + t.pass('peer1 stable after ice restart') + peer1.removeListener('signalingStateChange', onPeer1SignalState) + } else { + console.log('peer1 signalingStatechange = ' + state) + } + } + + function onPeer2SignalState(state) { + if (state === 'stable') { + t.pass('peer2 stable after ice restart') + peer2.removeListener('signalingStateChange', onPeer2SignalState) + } else { + console.log('peer2 signalingStatechange = ' + state) + } + } + + peer1.on('signalingStateChange', onPeer1SignalState) + peer2.on('signalingStateChange', onPeer2SignalState) + + function onPeer1IceStateChange(state, gatheringState) { + if (state === 'connected' && gatheringState === 'complete') { + t.pass('peer1 got ice connected state after ice restart') + peer1.removeListener('iceStateChange', onPeer1IceStateChange) + } else { + console.log('peer1 iceStatechange: ice = ' + state + ', gathering = ' + gatheringState) + } + } + + function onPeer2IceStateChange(state, gatheringState) { + if (state === 'connected' && gatheringState === 'complete') { + t.pass('peer2 got ice connected state after ice restart') + peer2.removeListener('iceStateChange', onPeer2IceStateChange) + } else { + console.log('peer2 iceStatechange: ice = ' + state + ', gathering = ' + gatheringState) + } + } + + peer1.on('iceStateChange', onPeer1IceStateChange) + peer2.on('iceStateChange', onPeer2IceStateChange) + + peer1.on('_iceGatheringComplete', function () { + console.log('peer1 _iceGatheringComplete') + }) + + peer2.on('_iceGatheringComplete', function () { + console.log('peer2 _iceGatheringComplete') + }) + + } +})