Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/kad-dht/src/peer-distance-list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ export class PeerDistanceList {
this.peerDistances = this.peerDistances.slice(0, this.capacity)
}

/**
* True if a peer with this (already-computed) kadId could still enter the
* list: under capacity, or strictly closer than the furthest entry.
*/
canAddKadId (kadId: Uint8Array): boolean {
if (this.peerDistances.length < this.capacity) {
return true
}

const distance = uint8ArrayXor(this.originDhtKey, kadId)
const furthest = this.peerDistances[this.peerDistances.length - 1].distance

return uint8ArrayXorCompare(distance, furthest) === -1
}

/**
* Indicates whether any of the peerIds passed as a parameter are closer
* to the origin key than the furthest peerId in the PeerDistanceList.
Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ export class QueryManager implements Startable {
query: queryFunc,
path: index,
numPaths: peersToQuery.length,
kBucketSize: this.routingTable.kBucketSize,
alpha: this.alpha,
log,
peersSeen,
Expand Down
24 changes: 23 additions & 1 deletion packages/kad-dht/src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Queue } from '@libp2p/utils'
import { pushable } from 'it-pushable'
import { xor as uint8ArrayXor } from 'uint8arrays/xor'
import { xorCompare as uint8ArrayXorCompare } from 'uint8arrays/xor-compare'
import { PeerDistanceList } from '../peer-distance-list.ts'
import { convertPeerId, convertBuffer } from '../utils.ts'
import { pathEndedEvent, queryErrorEvent } from './events.ts'
import type { QueryEvent } from '../index.ts'
Expand Down Expand Up @@ -47,6 +48,12 @@ export interface QueryPathOptions extends RoutingOptions {
*/
numPaths: number

/**
* The maximum number of closest peers to find - used as the capacity of the
* closest-peer set that drives lookup termination
*/
kBucketSize: number

/**
* Query log
*/
Expand Down Expand Up @@ -77,7 +84,7 @@ interface QueryQueueOptions extends AbortOptions {
* every peer encountered that we have not seen before
*/
export async function * queryPath (options: QueryPathOptions): AsyncGenerator<QueryEvent, void, undefined> {
const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal } = options
const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, kBucketSize, log, peersSeen, connectionManager, signal } = options
const events = pushable<QueryEvent>({
objectMode: true
})
Expand Down Expand Up @@ -117,6 +124,9 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
signal
})

// closest peers that have responded on this path; drives lookup termination
const closest = new PeerDistanceList(kadId, kBucketSize)

/**
* Adds the passed peer to the query queue if it's not us and no other path
* has passed through this peer
Expand All @@ -131,6 +141,11 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
const peerXor = uint8ArrayXor(peerKadId, kadId)

queue.add(async () => {
if (!closest.canAddKadId(peerKadId)) {
log('skipping %p, path converged', peer.id)
return
}

try {
for await (const event of query({
...options,
Expand All @@ -148,6 +163,8 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
})) {
// if there are closer peers and the query has not completed, continue the query
if (event.name === 'PEER_RESPONSE') {
closest.addWithKadId(peer, peerKadId)

for (const closerPeer of event.closer) {
if (peersSeen.has(closerPeer.id.toMultihash().bytes)) { // eslint-disable-line max-depth
log('already seen %p in query', closerPeer.id)
Expand All @@ -170,6 +187,11 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
continue
}

// skip queuing peers that can't enter the closest set (saves the dialability check)
if (!closest.canAddKadId(closerPeerKadId)) { // eslint-disable-line max-depth
continue
}

// dialability is the most expensive check so only run it for
// peers we would actually query
if (!(await connectionManager.isDialable(closerPeer.multiaddrs, { // eslint-disable-line max-depth
Expand Down
43 changes: 43 additions & 0 deletions packages/kad-dht/test/peer-distance-list.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,47 @@ describe('PeerDistanceList', () => {
expect(closer).to.be.eql(false)
})
})

describe('canAddKadId', () => {
// distance to the key (= p1): p1 < p4 < p3 < p6 < p2 (p7 equals p2)
it('returns true when the list is under capacity', async () => {
const pdl = new PeerDistanceList(key, 100)
await pdl.add(p1)
await pdl.add(p2)

// p7 is as far as the current furthest (p2), but there is spare capacity
const kadId = await kadUtils.convertPeerId(p7.id)
expect(pdl.canAddKadId(kadId)).to.be.true()
})

it('returns true at capacity for a closer peer', async () => {
const pdl = new PeerDistanceList(key, 2)
await pdl.add(p3)
await pdl.add(p2)

// p6 sits between p3 and the furthest (p2), so it would enter the list
const kadId = await kadUtils.convertPeerId(p6.id)
expect(pdl.canAddKadId(kadId)).to.be.true()
})

it('returns false at capacity for a peer equal to the furthest', async () => {
const pdl = new PeerDistanceList(key, 2)
await pdl.add(p3)
await pdl.add(p2)

// p7 is the same distance as the furthest (p2) - equal is not closer
const kadId = await kadUtils.convertPeerId(p7.id)
expect(pdl.canAddKadId(kadId)).to.be.false()
})

it('returns false at capacity for a farther peer', async () => {
const pdl = new PeerDistanceList(key, 2)
await pdl.add(p1)
await pdl.add(p4)

// p6 is farther than the furthest (p4)
const kadId = await kadUtils.convertPeerId(p6.id)
expect(pdl.canAddKadId(kadId)).to.be.false()
})
})
})
131 changes: 131 additions & 0 deletions packages/kad-dht/test/query.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { defaultLogger } from '@libp2p/logger'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import delay from 'delay'
import all from 'it-all'
Expand Down Expand Up @@ -1030,4 +1031,134 @@ describe('QueryManager', () => {

await manager.stop()
})

it('should not query peers farther than the kth-closest already found', async () => {
// the closest-peer set capacity is the kBucketSize - keep it small so the
// convergence gate is exercised with only a handful of peers
const rt = stubInterface<RoutingTable>({ kBucketSize: 1 })
rt.closestPeers.returns([peers[10].peerId])

const manager = new QueryManager({
peerId: ourPeerId,
logger: defaultLogger(),
connectionManager: stubInterface<ConnectionManager>({
isDialable: async () => true
})
}, {
...defaultInit(),
routingTable: rt,
disjointPaths: 1,
alpha: 1
})
await manager.start()

// peers are sorted closest (0) -> farthest (38)
// 10 -> [1, 9] both closer than the seed (10)
// 1 -> [0]
// 0 -> []
// 9 -> [] once 1 and 0 respond, 9 is farther than the kth-closest
// (k=1) and must NOT be queried, despite being closer than
// its parent (10)
const topology = createTopology({
10: { closerPeers: [1, 9] },
1: { closerPeers: [0] },
0: {},
9: {}
})

const results = await all(manager.run(key, createQueryFunction(topology)))
const traversed = results
.filter(evt => evt.type !== EventTypes.PATH_ENDED)
.map(event => {
if (event.type !== EventTypes.PEER_RESPONSE && event.type !== EventTypes.VALUE) {
throw new Error(`Unexpected query event type ${event.type}`)
}

return event.from.toString()
})

expect(traversed).to.include(peers[10].peerId.toString())
expect(traversed).to.include(peers[1].peerId.toString())
expect(traversed).to.include(peers[0].peerId.toString())
// closer than its parent (10) but farther than the kth-closest found
expect(traversed).to.not.include(peers[9].peerId.toString())

await manager.stop()
})

it('does not run the dialability check for peers that cannot enter the closest set', async () => {
// k = 1, alpha = 2: peers 3 and 9 are queried concurrently. peer 3 responds
// first and fills the size-1 closest set, then peer 9 responds with peer 5,
// which is closer than its parent (9) but farther than the kth-closest (3).
// the gate must prune peer 5 before the (expensive) dialability check runs.
const rt = stubInterface<RoutingTable>({ kBucketSize: 1 })
rt.closestPeers.returns([peers[3].peerId, peers[9].peerId])

const connectionManager = stubInterface<ConnectionManager>()
connectionManager.isDialable.resolves(true)

const manager = new QueryManager({
peerId: ourPeerId,
logger: defaultLogger(),
connectionManager
}, {
...defaultInit(),
routingTable: rt,
disjointPaths: 1,
alpha: 2
})
await manager.start()

// only peer 5 carries an address, so a dial check for it is detectable
const peer5Multiaddr = multiaddr('/ip4/127.0.0.1/tcp/4005')
// release peer 9 only once peer 3 has responded and filled the closest set
const closestFilled = pDefer()

const queryFunc: QueryFunc = async function * (context) {
const { peer } = context
const path = { index: -1, queued: 0, running: 0, total: 0 }

if (peer.id.equals(peers[3].peerId)) {
// peer 2 can enter the set, so it reaches the dial check (spy is live)
yield peerResponseEvent({
from: peer.id,
messageType: MessageType.GET_VALUE,
closer: [{ id: peers[2].peerId, multiaddrs: [], protocols: [] }],
path
})
closestFilled.resolve()
return
}

if (peer.id.equals(peers[9].peerId)) {
await closestFilled.promise
yield peerResponseEvent({
from: peer.id,
messageType: MessageType.GET_VALUE,
closer: [{ id: peers[5].peerId, multiaddrs: [peer5Multiaddr], protocols: [] }],
path
})
return
}

yield peerResponseEvent({
from: peer.id,
messageType: MessageType.GET_VALUE,
closer: [],
path
})
}

await all(manager.run(key, queryFunc))

// the dial check ran (peer 2 could enter the set) ...
expect(connectionManager.isDialable.called).to.be.true()
// ... but never for peer 5, which the gate pruned beforehand
const dialCheckedMultiaddrs = connectionManager.isDialable.getCalls()
.flatMap(call => call.args[0])
.map(ma => ma.toString())
expect(dialCheckedMultiaddrs).to.not.include(peer5Multiaddr.toString())

await manager.stop()
})
})
Loading