Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2a605a8
Added skeleton implementation
mralj Jul 25, 2025
f3ae3b1
Improvements, fixes and cleanups
mralj Jul 28, 2025
b7f3a67
P2: Improvements, fixes and cleanups
mralj Jul 28, 2025
7844364
More improvements and cleanups
mralj Jul 29, 2025
1f7dc19
Added test and bugfixes
mralj Jul 30, 2025
c8d7804
More improvements, code is in bad shape though
mralj Jul 31, 2025
c8f4c0d
post rebase fix
mralj Aug 7, 2025
21f9c71
Refactor
mralj Aug 7, 2025
7d0a204
typo fix
mralj Aug 7, 2025
ee91446
Bugfixes
mralj Aug 8, 2025
0085d26
cleanups
mralj Aug 17, 2025
4d947e7
test bugfix
mralj Aug 18, 2025
4561139
More bugfixes and cleanups
mralj Aug 18, 2025
b96c2ba
Testing and code improvements
mralj Sep 3, 2025
bfd3ab6
more tests
mralj Sep 4, 2025
d7d5b86
cleanup
mralj Sep 4, 2025
a178915
Transition to smart peers test
mralj Sep 5, 2025
2173702
more smart peer tests
mralj Sep 5, 2025
0c591c1
better naming
mralj Sep 5, 2025
7620b1e
testing bad peers
mralj Sep 5, 2025
5ae72bf
small test improvements
mralj Sep 8, 2025
edb9dd2
proper handlingof reqeust_limit_exceeded and cleanups
mralj Sep 8, 2025
ce5e6f2
rate limiting tests
mralj Sep 8, 2025
8ceb2e6
test aboritng
mralj Sep 8, 2025
f77019a
yields txs one by one + tx validation
mralj Sep 9, 2025
78d856c
added tests for tx validation and some cleanups and fixes
mralj Sep 9, 2025
48f19a4
proper pinned peer handling
mralj Sep 10, 2025
533a3fa
added comments and cleanups
mralj Sep 10, 2025
4f4a04d
Cleanup
mralj Sep 10, 2025
21aef54
feat(refactor): Fast Batch Tx Connector
Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions yarn-project/foundation/src/promise/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { DateProvider } from '../timer/date.js';

export type PromiseWithResolvers<T> = {
promise: Promise<T>;
resolve: (value: T) => void;
Expand Down Expand Up @@ -27,3 +29,25 @@ export function promiseWithResolvers<T>(): PromiseWithResolvers<T> {
reject,
};
}

/**
* Helper function that waits for a predicate to become true.
* @param pred - The predicate function to evaluate
* @param interval - The interval in milliseconds to check the predicate (default: 10ms)
* @param timeout - The maximum time in milliseconds to wait before rejecting (default: 5000ms)
* @param dateProvider - An optional DateProvider instance for getting the current time (default: new DateProvider())
*/
export function waitFor(pred: () => boolean, interval = 10, timeout = 5_000, dateProvider = new DateProvider()) {
const started = dateProvider.now();
return new Promise<void>((resolve, reject) => {
const id = setInterval(() => {
if (pred()) {
clearInterval(id);
resolve();
} else if (dateProvider.now() - started >= timeout) {
clearInterval(id);
reject(new Error('waitFor: timeout'));
}
}, interval);
});
}
5 changes: 5 additions & 0 deletions yarn-project/foundation/src/queue/semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { FifoMemoryQueue } from './fifo_memory_queue.js';

export interface ISemaphore {
acquire(): Promise<void>;
release(): void;
}

/**
* Allows the acquiring of up to `size` tokens before calls to acquire block, waiting for a call to release().
*/
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export async function createP2PClient<T extends P2PClientType>(
}

const txCollection = new TxCollection(
p2pService,
p2pService.getBatchTxRequesterService(),
nodeSources,
l1Constants,
mempools.txPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import type { EpochCache } from '@aztec/epoch-cache';
import { times } from '@aztec/foundation/collection';
import { Secp256k1Signer } from '@aztec/foundation/crypto';
import { Fr } from '@aztec/foundation/fields';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { retryUntil } from '@aztec/foundation/retry';
import { sleep } from '@aztec/foundation/sleep';
import { emptyChainConfig } from '@aztec/stdlib/config';
import type { WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
import { makeBlockProposal, makeHeader, mockTx } from '@aztec/stdlib/testing';
import { Tx, TxHash } from '@aztec/stdlib/tx';

import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import type { P2PClient } from '../../client/p2p_client.js';
import { type P2PConfig, getP2PDefaultConfig } from '../../config.js';
import type { AttestationPool } from '../../mem_pools/attestation_pool/attestation_pool.js';
import type { TxPool } from '../../mem_pools/tx_pool/index.js';
import { BatchTxRequester } from '../../services/reqresp/batch-tx-requester/batch_tx_requester.js';
import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js';
import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js';
import { getPorts } from '../../test-helpers/get-ports.js';
import { makeEnrs } from '../../test-helpers/make-enrs.js';
import { makeAndStartTestP2PClient, makeAndStartTestP2PClients } from '../../test-helpers/make-test-p2p-clients.js';

const TEST_TIMEOUT = 30_000;
jest.setTimeout(TEST_TIMEOUT);

describe('p2p client integration batch txs', () => {
let txPool: MockProxy<TxPool>;
let attestationPool: MockProxy<AttestationPool>;
let epochCache: MockProxy<EpochCache>;
let worldState: MockProxy<WorldStateSynchronizer>;

let connectionSampler: MockProxy<ConnectionSampler>;

let logger: Logger;
let p2pBaseConfig: P2PConfig;

let clients: P2PClient[] = [];

beforeEach(() => {
clients = [];
txPool = mock<TxPool>();
attestationPool = mock<AttestationPool>();
epochCache = mock<EpochCache>();
worldState = mock<WorldStateSynchronizer>();
connectionSampler = mock<ConnectionSampler>();

logger = createLogger('p2p:test:integration:batch');
p2pBaseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() };

//@ts-expect-error - we want to mock the getEpochAndSlotInNextL1Slot method, mocking ts is enough
epochCache.getEpochAndSlotInNextL1Slot.mockReturnValue({ ts: BigInt(0) });
epochCache.getRegisteredValidators.mockResolvedValue([]);

txPool.hasTxs.mockResolvedValue([]);
txPool.getAllTxs.mockImplementation(() => {
return Promise.resolve([] as Tx[]);
});
txPool.addTxs.mockResolvedValue(1);
txPool.getTxsByHash.mockImplementation(() => {
return Promise.resolve([] as Tx[]);
});

worldState.status.mockResolvedValue({
state: mock(),
syncSummary: {
latestBlockNumber: 0,
latestBlockHash: '',
finalizedBlockNumber: 0,
treesAreSynched: false,
oldestHistoricBlockNumber: 0,
},
});
logger.info(`Starting test ${expect.getState().currentTestName}`);
});

afterEach(async () => {
logger.info(`Tearing down state for ${expect.getState().currentTestName}`);
await shutdown(clients);
logger.info('Shut down p2p clients');

jest.restoreAllMocks();
jest.resetAllMocks();
jest.clearAllMocks();

clients = [];
});

// Shutdown all test clients
const shutdown = async (clients: P2PClient[]) => {
await Promise.all(clients.map(client => client.stop()));
await sleep(1000);
};

const createBlockProposal = (blockNumber: number, blockHash: Fr, txHashes: TxHash[]) => {
return makeBlockProposal({
signer: Secp256k1Signer.random(),
header: makeHeader(1, blockNumber),
archive: blockHash,
txHashes,
});
};

const setupClients = async (numberOfPeers: number, txPoolMocks?: MockProxy<TxPool>[]) => {
if (txPoolMocks) {
const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers);
let ports = [];
while (true) {
try {
ports = await getPorts(numberOfPeers);
break;
} catch {
await sleep(1000);
}
}
const peerEnrs = await makeEnrs(peerIdPrivateKeys, ports, p2pBaseConfig);

for (let i = 0; i < numberOfPeers; i++) {
const client = await makeAndStartTestP2PClient(peerIdPrivateKeys[i], ports[i], peerEnrs, {
p2pBaseConfig,
mockAttestationPool: attestationPool,
mockTxPool: txPoolMocks[i],
mockEpochCache: epochCache,
mockWorldState: worldState,
logger: createLogger(`p2p:${i}`),
});
clients.push(client);
}

return;
}

clients = (
await makeAndStartTestP2PClients(numberOfPeers, {
p2pBaseConfig,
mockAttestationPool: attestationPool,
mockTxPool: txPool,
mockEpochCache: epochCache,
mockWorldState: worldState,
logger,
})
).map(x => x.client);
};

async function makeSureClientsAreStarted() {
// Give the nodes time to discover each other
await sleep(4000);
for (const c of clients) {
await retryUntil(async () => (await c.getPeers()).length == clients.length - 1, 'peers discovered', 12, 0.5);
}

logger.info('Finished waiting for clients to connect');
}

it('batch requester fetches all missing txs from multiple peers', async () => {
const NUMBER_OF_PEERS = 4;

const txCount = 20;
const txs = await Promise.all(times(txCount, () => mockTx()));
const txHashes = await Promise.all(txs.map(tx => tx.getTxHash()));

const blockNumber = 5;
const blockHash = Fr.random();
const blockProposal = createBlockProposal(blockNumber, blockHash, txHashes);

// Distribute transactions across peers (simulating partial knowledge)
// Peer 0 has no txs (client requesting)
const peerTxDistribution = [
{ start: 0, end: 0 }, // Peer 0 (requester)
{ start: 0, end: 11 },
{ start: 6, end: 15 },
{ start: 10, end: 20 }, // Peer 3
];

// Create individual txPool mocks for each peer
const txPoolMocks: MockProxy<TxPool>[] = [];
for (let i = 0; i < NUMBER_OF_PEERS; i++) {
const peerTxPool = mock<TxPool>();
const { start, end } = peerTxDistribution[i];
const peerTxs = txs.slice(start, end);
const peerTxHashSet = new Set(peerTxs.map(tx => tx.txHash.toString()));

peerTxPool.hasTxs.mockImplementation((hashes: TxHash[]) => {
return Promise.resolve(hashes.map(h => peerTxHashSet.has(h.toString())));
});
peerTxPool.getTxsByHash.mockImplementation((hashes: TxHash[]) => {
return Promise.resolve(hashes.map(hash => peerTxs.find(t => t.txHash.equals(hash))));
});

txPoolMocks.push(peerTxPool);
}

await setupClients(NUMBER_OF_PEERS, txPoolMocks);
await makeSureClientsAreStarted();

const peerIds = clients.map(client => (client as any).p2pService.node.peerId);
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peerIds);

attestationPool.getBlockProposal.mockResolvedValue(blockProposal);

// Client 0 is missing all transactions
const missingTxHashes = txHashes;

// Create BatchTxRequester instance
const [client0] = clients;
const requester = new BatchTxRequester(
missingTxHashes,
blockProposal,
undefined, // no pinned peer
5_000,
(client0 as any).p2pService.reqresp,
connectionSampler,
() => Promise.resolve(true),
logger,
);

const fetchedTxs = await BatchTxRequester.collectAllTxs(requester.run());

// Verify all transactions were fetched
expect(fetchedTxs).toBeDefined();
const fetchedHashes = await Promise.all(fetchedTxs!.map(tx => tx.getTxHash()));
expect(
new Set(fetchedHashes.map(h => h.toString())).difference(new Set(txHashes.map(h => h.toString()))).size,
).toBe(0);
});
});
37 changes: 36 additions & 1 deletion yarn-project/p2p/src/services/dummy_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import type { Gossipable, PeerErrorSeverity } from '@aztec/stdlib/p2p';
import { Tx, TxHash } from '@aztec/stdlib/tx';

import type { ENR } from '@chainsafe/enr';
import type { PeerId } from '@libp2p/interface';
import type { Libp2p, PeerId } from '@libp2p/interface';
import EventEmitter from 'events';

import type { PeerManagerInterface } from './peer-manager/interface.js';
import type { BatchTxRequesterLibP2PService } from './reqresp/batch-tx-requester/interface.js';
import type { P2PReqRespConfig } from './reqresp/config.js';
import type { ConnectionSampler } from './reqresp/connection-sampler/connection_sampler.js';
import { type AuthRequest, StatusMessage } from './reqresp/index.js';
import type {
ReqRespInterface,
Expand Down Expand Up @@ -100,6 +102,15 @@ export class DummyP2PService implements P2PService {
return Promise.resolve([]);
}

public sendRequestToPeer(
_peerId: PeerId,
_subProtocol: ReqRespSubProtocol,
_payload: Buffer,
_dialTimeout?: number,
): Promise<ReqRespResponse> {
return Promise.resolve({ status: ReqRespStatus.SUCCESS, data: Buffer.from([]) });
}

/**
* Returns the ENR of the peer.
* @returns The ENR of the peer, otherwise undefined.
Expand All @@ -112,6 +123,10 @@ export class DummyP2PService implements P2PService {
return Promise.resolve();
}

validatePropagatedTx(_tx: Tx, _peerId: PeerId): Promise<boolean> {
return Promise.resolve(true);
}

addReqRespSubProtocol(
_subProtocol: ReqRespSubProtocol,
_handler: ReqRespSubProtocolHandler,
Expand All @@ -126,6 +141,17 @@ export class DummyP2PService implements P2PService {

//this is no-op
registerThisValidatorAddresses(_address: EthAddress[]): void {}

/**
* Get dummy BatchTxRequesterLibP2PService for testing
*/
getBatchTxRequesterService(): BatchTxRequesterLibP2PService {
return {
reqResp: this, // The dummy service implements ReqRespInterface
connectionSampler: new DummyReqResp().getConnectionSampler(),
txValidator: (_tx: Tx, _peerId: PeerId) => Promise.resolve(true), // Always return true for dummy
};
}
}

/**
Expand Down Expand Up @@ -262,6 +288,15 @@ export class DummyReqResp implements ReqRespInterface {
return Promise.resolve({ status: ReqRespStatus.SUCCESS, data: Buffer.from([]) });
}

/**
* Get dummy connection sampler for testing
*/
getConnectionSampler(): Pick<ConnectionSampler, 'getPeerListSortedByConnectionCountAsc'> {
return {
getPeerListSortedByConnectionCountAsc: () => [],
};
}

addSubProtocol(
_subProtocol: ReqRespSubProtocol,
_handler: ReqRespSubProtocolHandler,
Expand Down
Loading
Loading