From b76cc8011970a05558ee2bf161150fa678da6505 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 28 Feb 2023 19:53:12 -0800 Subject: [PATCH 1/5] Add perf protocol --- src/perf/index.ts | 133 ++++++++++++++++++++++++++++++++++++++++ test/perf/index.spec.ts | 93 ++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 src/perf/index.ts create mode 100644 test/perf/index.spec.ts diff --git a/src/perf/index.ts b/src/perf/index.ts new file mode 100644 index 0000000000..83c7362113 --- /dev/null +++ b/src/perf/index.ts @@ -0,0 +1,133 @@ +import { logger } from '@libp2p/logger' +import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { Startable } from '@libp2p/interfaces/startable' +import type { AbortOptions } from '@libp2p/interfaces' +import type { ConnectionManager } from '@libp2p/interface-connection-manager' + +export const PROTOCOL = '/perf/1.0.0' + +const log = logger('libp2p:perf') + +const writeBlockSize = BigInt(64 << 10) +const maxStreams = 1 << 10 + +export interface PerfComponents { + registrar: Registrar + connectionManager: ConnectionManager +} + +export class PerfService implements Startable { + public readonly protocol: string + private readonly components: PerfComponents + private started: boolean + private readonly databuf: ArrayBuffer + + constructor (components: PerfComponents) { + this.components = components + this.started = false + this.protocol = PROTOCOL + this.databuf = new ArrayBuffer(Number(writeBlockSize)) + } + + async start () { + await this.components.registrar.handle(this.protocol, (data: IncomingStreamData) => { void this.handleMessage(data) }, { + maxInboundStreams: maxStreams, + maxOutboundStreams: maxStreams + }) + this.started = true + } + + async stop () { + await this.components.registrar.unhandle(this.protocol) + this.started = false + } + + isStarted () { + return this.started + } + + async handleMessage (data: IncomingStreamData) { + const { stream } = data + + let bytesToSendBack: bigint | null = null + for await (const buf of stream.source) { + if (bytesToSendBack === null) { + bytesToSendBack = BigInt(buf.getBigUint64(0, false)) + } + // Ingest all the bufs and wait for the read side to close + } + + const uint8Buf = new Uint8Array(this.databuf) + + if (bytesToSendBack === null) { + throw new Error('bytesToSendBack was null') + } + await stream.sink(async function * () { + while (bytesToSendBack > 0n) { + let toSend: bigint = writeBlockSize + if (toSend > bytesToSendBack) { + toSend = bytesToSendBack + } + bytesToSendBack = bytesToSendBack - toSend + yield uint8Buf.slice(0, Number(toSend)) + } + }()) + } + + async startPerfOnStream (peer: PeerId, sendBytes: bigint, recvBytes: bigint, options: AbortOptions = {}): Promise { + log('dialing %s to %p', this.protocol, peer) + + const uint8Buf = new Uint8Array(this.databuf) + + const connection = await this.components.connectionManager.openConnection(peer, options) + const signal = options.signal + const stream = await connection.newStream([this.protocol], { + signal + }) + + // Convert sendBytes to uint64 big endian buffer + const view = new DataView(this.databuf) + view.setBigInt64(0, recvBytes, false) + + await stream.sink((async function * () { + // Send the number of bytes to receive + yield uint8Buf.slice(0, 8) + // Send the number of bytes to send + while (sendBytes > 0n) { + let toSend: bigint = writeBlockSize + if (toSend > sendBytes) { + toSend = sendBytes + } + sendBytes = sendBytes - toSend + yield uint8Buf.slice(0, Number(toSend)) + } + })()) + + // Read the received bytes + let actualRecvdBytes = BigInt(0) + for await (const buf of stream.source) { + actualRecvdBytes += BigInt(buf.length) + } + + if (actualRecvdBytes !== recvBytes) { + throw new Error(`Expected to receive ${recvBytes} bytes, but received ${actualRecvdBytes}`) + } + + stream.close() + } + + // measureDownloadBandwidth returns the measured bandwidth in bits per second + async measureDownloadBandwidth (peer: PeerId, size: bigint) { + const now = Date.now() + await this.startPerfOnStream(peer, 0n, size) + return Number((8000n * size) / BigInt(Date.now() - now)) + } + + // measureUploadBandwidth returns the measured bandwidth in bit per second + async measureUploadBandwidth (peer: PeerId, size: bigint) { + const now = Date.now() + await this.startPerfOnStream(peer, size, 0n) + return Number((8000n * size) / BigInt(Date.now() - now)) + } +} diff --git a/test/perf/index.spec.ts b/test/perf/index.spec.ts new file mode 100644 index 0000000000..7d1aaaa40f --- /dev/null +++ b/test/perf/index.spec.ts @@ -0,0 +1,93 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import Peers from '../fixtures/peers.js' +import { PerfService } from '../../src/perf/index.js' +import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks' +import { createFromJSON } from '@libp2p/peer-id-factory' +import { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import { start, stop } from '@libp2p/interfaces/startable' +import { CustomEvent } from '@libp2p/interfaces/events' +import { PersistentPeerStore } from '@libp2p/peer-store' +import { MemoryDatastore } from 'datastore-core' +import { DefaultComponents } from '../../src/components.js' + +async function createComponents (index: number): Promise { + const peerId = await createFromJSON(Peers[index]) + + const components = new DefaultComponents({ + peerId, + registrar: mockRegistrar(), + upgrader: mockUpgrader(), + datastore: new MemoryDatastore() + }) + components.peerStore = new PersistentPeerStore(components) + components.connectionManager = new DefaultConnectionManager(components, { + minConnections: 50, + maxConnections: 1000, + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 + }) + + return components +} + +describe('perf', () => { + let localComponents: DefaultComponents + let remoteComponents: DefaultComponents + + beforeEach(async () => { + localComponents = await createComponents(0) + remoteComponents = await createComponents(1) + + await Promise.all([ + start(localComponents), + start(remoteComponents) + ]) + }) + + afterEach(async () => { + await Promise.all([ + stop(localComponents), + stop(remoteComponents) + ]) + }) + + it('should run perf', async () => { + const client = new PerfService(localComponents) + const server = new PerfService(remoteComponents) + + await start(client) + await start(server) + + // simulate connection between nodes + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + localComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: localToRemote })) + remoteComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal })) + + // Run Perf + await expect(client.startPerfOnStream(remoteComponents.peerId, 1n << 10n, 1n << 10n)).to.eventually.be.fulfilled() + }) + + it('local benchmark', async () => { + const client = new PerfService(localComponents) + const server = new PerfService(remoteComponents) + + await start(client) + await start(server) + + // simulate connection between nodes + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + localComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: localToRemote })) + remoteComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal })) + + // Run Perf + const downloadBandwidth = await client.measureDownloadBandwidth(remoteComponents.peerId, 10n << 20n) + // eslint-disable-next-line no-console + console.log('Download bandwidth: ', downloadBandwidth >> 10, ' kiB/s') + + const uploadBandwidth = await client.measureDownloadBandwidth(remoteComponents.peerId, 10n << 20n) + // eslint-disable-next-line no-console + console.log('Upload bandwidth: ', uploadBandwidth >> 10, ' kiB/s') + }) +}) From 017292b84e9707fe35501b14fd1cfd4414995d47 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 28 Feb 2023 21:32:57 -0800 Subject: [PATCH 2/5] Expose necessary types --- package.json | 6 +++++- src/index.ts | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index a13f1f06bf..0ce1b90792 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,10 @@ "types": "./src/index.d.ts", "import": "./dist/src/index.js" }, + "./perf": { + "types": "./src/perf/index.d.ts", + "import": "./dist/src/perf/index.js" + }, "./insecure": { "types": "./dist/src/insecure/index.d.ts", "import": "./dist/src/insecure/index.js" @@ -203,4 +207,4 @@ "browser": { "nat-api": false } -} +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index db2a467ee9..1c9226dff4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -14,7 +14,7 @@ * ``` */ -import { createLibp2pNode } from './libp2p.js' +import { createLibp2pNode, type Libp2pNode } from './libp2p.js' import type { RecursivePartial } from '@libp2p/interfaces' import type { TransportManagerInit } from './transport-manager.js' import type { IdentifyServiceInit } from './identify/index.js' @@ -168,7 +168,7 @@ export interface Libp2pEvents { 'peer:discovery': CustomEvent } -export type { Libp2p } +export type { Libp2p, Libp2pNode } export type Libp2pOptions = RecursivePartial & { start?: boolean } @@ -197,7 +197,7 @@ export type Libp2pOptions = RecursivePartial & { start?: boolean } * const libp2p = await createLibp2p(options) * ``` */ -export async function createLibp2p (options: Libp2pOptions): Promise { +export async function createLibp2p (options: Libp2pOptions): Promise { const node = await createLibp2pNode(options) if (options.start !== false) { From d4383ac08740d292fc86e5c470b5f9680a38dfb7 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 28 Feb 2023 21:33:15 -0800 Subject: [PATCH 3/5] Add benchmark example --- examples/benchmark/src/index.ts | 133 ++++++++++++++++++++++++++++++++ examples/tsconfig.json | 10 +++ 2 files changed, 143 insertions(+) create mode 100644 examples/benchmark/src/index.ts create mode 100644 examples/tsconfig.json diff --git a/examples/benchmark/src/index.ts b/examples/benchmark/src/index.ts new file mode 100644 index 0000000000..3677faa2fe --- /dev/null +++ b/examples/benchmark/src/index.ts @@ -0,0 +1,133 @@ +/* eslint-disable no-console */ + +import { tcp } from '@libp2p/tcp' +import { yamux } from '@chainsafe/libp2p-yamux' +import { mplex } from '@libp2p/mplex' +import { noise } from '@chainsafe/libp2p-noise' +import { createLibp2p, type Libp2pOptions, type Libp2pNode } from 'libp2p' +import { PerfService } from 'libp2p/perf' +import { plaintext } from 'libp2p/insecure' + +async function newNode (options: Libp2pOptions): Promise { + return await createLibp2p({ + ...options + }) +} + +async function benchmarkWithOptions (serverOptions: Libp2pOptions, clientOptions: Libp2pOptions) { + const server = await newNode(serverOptions) + const client = await newNode(clientOptions) + + const serverPerf = new PerfService(server.components) + await serverPerf.start() + + const clientPerf = new PerfService(client.components) + await clientPerf.start() + await client.dial(server.getMultiaddrs()[0]) + + // Warmup + await clientPerf.measureDownloadBandwidth(server.peerId, 10n << 20n) + await clientPerf.measureUploadBandwidth(server.peerId, 10n << 20n) + + const downloadBandwidth = await clientPerf.measureDownloadBandwidth(server.peerId, 100n << 20n) + console.log('Download bandwidth is (mbits/s)', downloadBandwidth >> 20) + const uploadBandwidth = await clientPerf.measureDownloadBandwidth(server.peerId, 50n << 20n) + console.log('Upload bandwidth is (mbits/s)', uploadBandwidth >> 20) + + await clientPerf.stop() + await serverPerf.stop() + + server.stop() + client.stop() +} + +async function run () { + const testcases = [ + { + name: 'TCP+mplex+noise', + baseOptions: { + transports: [ + tcp() + ], + streamMuxers: [ + mplex() + ], + connectionEncryption: [ + noise() + ] + }, + serverOptions: { + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + } + } + }, + { + name: 'TCP+yamux+noise', + baseOptions: { + transports: [ + tcp() + ], + streamMuxers: [ + yamux() + ], + connectionEncryption: [ + noise() + ] + }, + serverOptions: { + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + } + } + }, + { + name: 'TCP+yamux+plaintext', + baseOptions: { + transports: [ + tcp() + ], + streamMuxers: [ + yamux() + ], + connectionEncryption: [ + plaintext() + ] + }, + serverOptions: { + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + } + } + }, + { + name: 'TCP+mplex+plaintext', + baseOptions: { + transports: [ + tcp() + ], + streamMuxers: [ + mplex() + ], + connectionEncryption: [ + plaintext() + ] + }, + serverOptions: { + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + } + } + } + ] + + for (const testcase of testcases) { + console.log(testcase.name) + await benchmarkWithOptions({ + ...testcase.baseOptions, + ...testcase.serverOptions + }, testcase.baseOptions) + } +} + +void run() diff --git a/examples/tsconfig.json b/examples/tsconfig.json new file mode 100644 index 0000000000..45e63f3060 --- /dev/null +++ b/examples/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "benchmark", + ], + "exclude": [] +} \ No newline at end of file From f05b115d860e380330ab861e8bbf8ae4b8a238b8 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 2 Mar 2023 09:35:48 -0800 Subject: [PATCH 4/5] Use subarray --- src/perf/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/perf/index.ts b/src/perf/index.ts index 83c7362113..0a06a7be83 100644 --- a/src/perf/index.ts +++ b/src/perf/index.ts @@ -70,7 +70,7 @@ export class PerfService implements Startable { toSend = bytesToSendBack } bytesToSendBack = bytesToSendBack - toSend - yield uint8Buf.slice(0, Number(toSend)) + yield uint8Buf.subarray(0, Number(toSend)) } }()) } @@ -92,7 +92,7 @@ export class PerfService implements Startable { await stream.sink((async function * () { // Send the number of bytes to receive - yield uint8Buf.slice(0, 8) + yield uint8Buf.subarray(0, 8) // Send the number of bytes to send while (sendBytes > 0n) { let toSend: bigint = writeBlockSize @@ -100,7 +100,7 @@ export class PerfService implements Startable { toSend = sendBytes } sendBytes = sendBytes - toSend - yield uint8Buf.slice(0, Number(toSend)) + yield uint8Buf.subarray(0, Number(toSend)) } })()) From 133c574044a3626250b41e5e93691080aafb8cfe Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 2 Mar 2023 10:11:02 -0800 Subject: [PATCH 5/5] Fix arithmetic --- examples/benchmark/src/index.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/benchmark/src/index.ts b/examples/benchmark/src/index.ts index 3677faa2fe..305ff4145f 100644 --- a/examples/benchmark/src/index.ts +++ b/examples/benchmark/src/index.ts @@ -30,15 +30,15 @@ async function benchmarkWithOptions (serverOptions: Libp2pOptions, clientOptions await clientPerf.measureUploadBandwidth(server.peerId, 10n << 20n) const downloadBandwidth = await clientPerf.measureDownloadBandwidth(server.peerId, 100n << 20n) - console.log('Download bandwidth is (mbits/s)', downloadBandwidth >> 20) + console.log('Download bandwidth is (mbits/s)', BigInt(downloadBandwidth) >> 20n) const uploadBandwidth = await clientPerf.measureDownloadBandwidth(server.peerId, 50n << 20n) - console.log('Upload bandwidth is (mbits/s)', uploadBandwidth >> 20) + console.log('Upload bandwidth is (mbits/s)', BigInt(uploadBandwidth) >> 20n) await clientPerf.stop() await serverPerf.stop() - server.stop() - client.stop() + await server.stop() + await client.stop() } async function run () {