diff --git a/packages/kad-dht/src/peer-distance-list.ts b/packages/kad-dht/src/peer-distance-list.ts index dd423a825a..4a8ff95659 100644 --- a/packages/kad-dht/src/peer-distance-list.ts +++ b/packages/kad-dht/src/peer-distance-list.ts @@ -96,6 +96,21 @@ export class PeerDistanceList { this.peerDistances = this.peerDistances.slice(0, this.capacity) } + /** + * True if a peer with this (already-computed) kadId could still enter the + * list: under capacity, or strictly closer than the furthest entry. + */ + canAddKadId (kadId: Uint8Array): boolean { + if (this.peerDistances.length < this.capacity) { + return true + } + + const distance = uint8ArrayXor(this.originDhtKey, kadId) + const furthest = this.peerDistances[this.peerDistances.length - 1].distance + + return uint8ArrayXorCompare(distance, furthest) === -1 + } + /** * Indicates whether any of the peerIds passed as a parameter are closer * to the origin key than the furthest peerId in the PeerDistanceList. diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 9ee21f8611..827f6aab8d 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -205,6 +205,7 @@ export class QueryManager implements Startable { query: queryFunc, path: index, numPaths: peersToQuery.length, + kBucketSize: this.routingTable.kBucketSize, alpha: this.alpha, log, peersSeen, diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index 9e35aba2a0..6694d3f729 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -3,6 +3,7 @@ import { Queue } from '@libp2p/utils' import { pushable } from 'it-pushable' import { xor as uint8ArrayXor } from 'uint8arrays/xor' import { xorCompare as uint8ArrayXorCompare } from 'uint8arrays/xor-compare' +import { PeerDistanceList } from '../peer-distance-list.ts' import { convertPeerId, convertBuffer } from '../utils.ts' import { pathEndedEvent, queryErrorEvent } from './events.ts' import type { QueryEvent } from '../index.ts' @@ -47,6 +48,12 @@ export interface QueryPathOptions extends RoutingOptions { */ numPaths: number + /** + * The maximum number of closest peers to find - used as the capacity of the + * closest-peer set that drives lookup termination + */ + kBucketSize: number + /** * Query log */ @@ -77,7 +84,7 @@ interface QueryQueueOptions extends AbortOptions { * every peer encountered that we have not seen before */ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { - const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal } = options + const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, kBucketSize, log, peersSeen, connectionManager, signal } = options const events = pushable({ objectMode: true }) @@ -117,6 +124,9 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { + if (!closest.canAddKadId(peerKadId)) { + log('skipping %p, path converged', peer.id) + return + } + try { for await (const event of query({ ...options, @@ -148,6 +163,8 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { expect(closer).to.be.eql(false) }) }) + + describe('canAddKadId', () => { + // distance to the key (= p1): p1 < p4 < p3 < p6 < p2 (p7 equals p2) + it('returns true when the list is under capacity', async () => { + const pdl = new PeerDistanceList(key, 100) + await pdl.add(p1) + await pdl.add(p2) + + // p7 is as far as the current furthest (p2), but there is spare capacity + const kadId = await kadUtils.convertPeerId(p7.id) + expect(pdl.canAddKadId(kadId)).to.be.true() + }) + + it('returns true at capacity for a closer peer', async () => { + const pdl = new PeerDistanceList(key, 2) + await pdl.add(p3) + await pdl.add(p2) + + // p6 sits between p3 and the furthest (p2), so it would enter the list + const kadId = await kadUtils.convertPeerId(p6.id) + expect(pdl.canAddKadId(kadId)).to.be.true() + }) + + it('returns false at capacity for a peer equal to the furthest', async () => { + const pdl = new PeerDistanceList(key, 2) + await pdl.add(p3) + await pdl.add(p2) + + // p7 is the same distance as the furthest (p2) - equal is not closer + const kadId = await kadUtils.convertPeerId(p7.id) + expect(pdl.canAddKadId(kadId)).to.be.false() + }) + + it('returns false at capacity for a farther peer', async () => { + const pdl = new PeerDistanceList(key, 2) + await pdl.add(p1) + await pdl.add(p4) + + // p6 is farther than the furthest (p4) + const kadId = await kadUtils.convertPeerId(p6.id) + expect(pdl.canAddKadId(kadId)).to.be.false() + }) + }) }) diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index 0bb5081aeb..bb5665b5ae 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -1,4 +1,5 @@ import { defaultLogger } from '@libp2p/logger' +import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import delay from 'delay' import all from 'it-all' @@ -1030,4 +1031,134 @@ describe('QueryManager', () => { await manager.stop() }) + + it('should not query peers farther than the kth-closest already found', async () => { + // the closest-peer set capacity is the kBucketSize - keep it small so the + // convergence gate is exercised with only a handful of peers + const rt = stubInterface({ kBucketSize: 1 }) + rt.closestPeers.returns([peers[10].peerId]) + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager: stubInterface({ + isDialable: async () => true + }) + }, { + ...defaultInit(), + routingTable: rt, + disjointPaths: 1, + alpha: 1 + }) + await manager.start() + + // peers are sorted closest (0) -> farthest (38) + // 10 -> [1, 9] both closer than the seed (10) + // 1 -> [0] + // 0 -> [] + // 9 -> [] once 1 and 0 respond, 9 is farther than the kth-closest + // (k=1) and must NOT be queried, despite being closer than + // its parent (10) + const topology = createTopology({ + 10: { closerPeers: [1, 9] }, + 1: { closerPeers: [0] }, + 0: {}, + 9: {} + }) + + const results = await all(manager.run(key, createQueryFunction(topology))) + const traversed = results + .filter(evt => evt.type !== EventTypes.PATH_ENDED) + .map(event => { + if (event.type !== EventTypes.PEER_RESPONSE && event.type !== EventTypes.VALUE) { + throw new Error(`Unexpected query event type ${event.type}`) + } + + return event.from.toString() + }) + + expect(traversed).to.include(peers[10].peerId.toString()) + expect(traversed).to.include(peers[1].peerId.toString()) + expect(traversed).to.include(peers[0].peerId.toString()) + // closer than its parent (10) but farther than the kth-closest found + expect(traversed).to.not.include(peers[9].peerId.toString()) + + await manager.stop() + }) + + it('does not run the dialability check for peers that cannot enter the closest set', async () => { + // k = 1, alpha = 2: peers 3 and 9 are queried concurrently. peer 3 responds + // first and fills the size-1 closest set, then peer 9 responds with peer 5, + // which is closer than its parent (9) but farther than the kth-closest (3). + // the gate must prune peer 5 before the (expensive) dialability check runs. + const rt = stubInterface({ kBucketSize: 1 }) + rt.closestPeers.returns([peers[3].peerId, peers[9].peerId]) + + const connectionManager = stubInterface() + connectionManager.isDialable.resolves(true) + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager + }, { + ...defaultInit(), + routingTable: rt, + disjointPaths: 1, + alpha: 2 + }) + await manager.start() + + // only peer 5 carries an address, so a dial check for it is detectable + const peer5Multiaddr = multiaddr('/ip4/127.0.0.1/tcp/4005') + // release peer 9 only once peer 3 has responded and filled the closest set + const closestFilled = pDefer() + + const queryFunc: QueryFunc = async function * (context) { + const { peer } = context + const path = { index: -1, queued: 0, running: 0, total: 0 } + + if (peer.id.equals(peers[3].peerId)) { + // peer 2 can enter the set, so it reaches the dial check (spy is live) + yield peerResponseEvent({ + from: peer.id, + messageType: MessageType.GET_VALUE, + closer: [{ id: peers[2].peerId, multiaddrs: [], protocols: [] }], + path + }) + closestFilled.resolve() + return + } + + if (peer.id.equals(peers[9].peerId)) { + await closestFilled.promise + yield peerResponseEvent({ + from: peer.id, + messageType: MessageType.GET_VALUE, + closer: [{ id: peers[5].peerId, multiaddrs: [peer5Multiaddr], protocols: [] }], + path + }) + return + } + + yield peerResponseEvent({ + from: peer.id, + messageType: MessageType.GET_VALUE, + closer: [], + path + }) + } + + await all(manager.run(key, queryFunc)) + + // the dial check ran (peer 2 could enter the set) ... + expect(connectionManager.isDialable.called).to.be.true() + // ... but never for peer 5, which the gate pruned beforehand + const dialCheckedMultiaddrs = connectionManager.isDialable.getCalls() + .flatMap(call => call.args[0]) + .map(ma => ma.toString()) + expect(dialCheckedMultiaddrs).to.not.include(peer5Multiaddr.toString()) + + await manager.stop() + }) })