Skip to content

Commit a93a1e6

Browse files
feat(internal): Rate limiting in ymax planner when fetching pending transactions (#12634)
1 parent 2ac186c commit a93a1e6

5 files changed

Lines changed: 314 additions & 28 deletions

File tree

services/ymax-planner/src/engine.ts

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ import type {
6767
HandlePendingTxOpts,
6868
} from './pending-tx-manager.ts';
6969
import { handlePendingTx } from './pending-tx-manager.ts';
70+
import rateLimitedSource from './rate-limited-source.ts';
7071
import type { BalanceQueryPowers } from './plan-deposit.ts';
7172
import {
7273
getNonDustBalances,
@@ -173,6 +174,7 @@ export type Powers = {
173174
console?: Pick<Console, 'debug' | 'info' | 'log' | 'warn' | 'error'>;
174175
evmCtx: Omit<EvmContext, 'signingSmartWalletKit' | 'fetch'>;
175176
rpc: CosmosRPCClient;
177+
setTimeout: typeof globalThis.setTimeout;
176178
spectrumBlockchain: SpectrumBlockchainSdk;
177179
spectrumChainIds: Partial<Record<SupportedChain, string>>;
178180
evmTokenAddresses: Partial<Record<InstrumentId, EvmAddress>>;
@@ -801,34 +803,48 @@ export const startEngine = async (
801803
console.warn(`Found ${pendingTxKeys.length} pending transactions`);
802804

803805
const initialPendingTxData: PendingTxRecord[] = [];
804-
await makeWorkPool(pendingTxKeys, undefined, async (txId: TxId) => {
805-
const path = `${pendingTxPathPrefix}.${txId}`;
806-
await null;
807-
let streamCellJson;
808-
let data;
809-
try {
810-
const metaResponse = await readStorageMeta(query.vstorage, path, 'data', {
811-
retries: 4,
812-
});
813-
streamCellJson = metaResponse.result.value;
814-
const streamCell = parseStreamCell(streamCellJson, path);
815-
const marshalledData = parseStreamCellValue(streamCell, -1, path);
816-
data = marshaller.fromCapData(marshalledData);
817-
if (
818-
data?.status !== TxStatus.PENDING ||
819-
!RESOLVER_SUPPORTED_TRANSACTIONS.includes(data.type)
820-
)
821-
return;
822-
mustMatch(harden(data), PublishedTxShape, path);
823-
initialPendingTxData.push({
824-
blockHeight: BigInt(streamCell.blockHeight),
825-
tx: { txId, ...data },
826-
});
827-
} catch (err) {
828-
const errLabel = `🚨 Failed to read old pending tx ${path}`;
829-
console.error(errLabel, data || streamCellJson, err);
830-
}
831-
}).done;
806+
const capacity = 10;
807+
const throttledPendingTxKeys = rateLimitedSource({
808+
policy: { quota: capacity, windowMs: 1000 },
809+
powers: { now: powers.now, setTimeout: powers.setTimeout },
810+
source: pendingTxKeys as Array<string>,
811+
});
812+
813+
await makeWorkPool(
814+
throttledPendingTxKeys,
815+
{ capacity },
816+
async (txId: TxId) => {
817+
const path = `${pendingTxPathPrefix}.${txId}`;
818+
await null;
819+
let streamCellJson;
820+
let data;
821+
try {
822+
const metaResponse = await readStorageMeta(
823+
query.vstorage,
824+
path,
825+
'data',
826+
{ retries: 4 },
827+
);
828+
streamCellJson = metaResponse.result.value;
829+
const streamCell = parseStreamCell(streamCellJson, path);
830+
const marshalledData = parseStreamCellValue(streamCell, -1, path);
831+
data = marshaller.fromCapData(marshalledData);
832+
if (
833+
data?.status !== TxStatus.PENDING ||
834+
!RESOLVER_SUPPORTED_TRANSACTIONS.includes(data.type)
835+
)
836+
return;
837+
mustMatch(harden(data), PublishedTxShape, path);
838+
initialPendingTxData.push({
839+
blockHeight: BigInt(streamCell.blockHeight),
840+
tx: { txId, ...data },
841+
});
842+
} catch (err) {
843+
const errLabel = `🚨 Failed to read old pending tx ${path}`;
844+
console.error(errLabel, data || streamCellJson, err);
845+
}
846+
},
847+
).done;
832848

833849
if (initialPendingTxData.length > 0) {
834850
// Process initial transactions in lookback mode upon planner startup
@@ -931,4 +947,5 @@ export const startEngine = async (
931947
}
932948
Fail`⚠️ rpc.subscribeAll finished`;
933949
};
950+
934951
harden(startEngine);

services/ymax-planner/src/main.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ export const main = async (
295295
...evmCtx,
296296
},
297297
rpc,
298+
setTimeout,
298299
spectrumChainIds,
299300
evmTokenAddresses,
300301
spectrumBlockchain,
@@ -327,4 +328,5 @@ export const main = async (
327328
});
328329
});
329330
};
331+
330332
harden(main);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Rate-limited async iterator wrapper.
3+
*
4+
* Releases items from `source` at a rate of at most `quota` items per any
5+
* `windowMs` sliding window, with bursts of up to `quota` allowed at the head
6+
* of an idle window. Naming follows the IETF `RateLimit` header draft's
7+
* quota/window vocabulary (q, w) but uses `windowMs` for sub-second precision.
8+
*
9+
* Time and timer access are passed in as powers — no ambient clock or
10+
* `setTimeout`.
11+
*
12+
* @see https://datatracker.ietf.org/doc/draft-ietf-httpapi-ratelimit-headers/
13+
*/
14+
15+
type RateLimitPolicy = {
16+
/** Maximum items released per window. */
17+
quota: number;
18+
/** Sliding window length in milliseconds. */
19+
windowMs: number;
20+
};
21+
22+
type RateLimitPowers = {
23+
now: () => number;
24+
setTimeout: typeof globalThis.setTimeout;
25+
};
26+
27+
const rateLimitedSource = <T>({
28+
policy,
29+
powers,
30+
source,
31+
}: {
32+
policy: RateLimitPolicy;
33+
powers: RateLimitPowers;
34+
source: Iterable<T> | AsyncIterable<T>;
35+
}) => {
36+
const { quota, windowMs } = policy;
37+
const { now, setTimeout } = powers;
38+
39+
if (!(Number.isInteger(quota) && quota > 0)) {
40+
throw RangeError('quota must be a positive integer');
41+
}
42+
43+
if (!(Number.isFinite(windowMs) && windowMs > 0)) {
44+
throw RangeError('windowMs must be a positive finite number');
45+
}
46+
47+
const delay = (ms: number) =>
48+
new Promise<void>(resolve => setTimeout(resolve, ms));
49+
50+
async function* generate(): AsyncGenerator<T, void, void> {
51+
const releases: number[] = [];
52+
53+
for await (const item of source) {
54+
const remainingCapacity = quota - releases.length;
55+
const t = now();
56+
57+
if (remainingCapacity > 0) {
58+
releases.push(t);
59+
} else {
60+
if (remainingCapacity !== 0) throw Error('internal: quota exceeded');
61+
62+
const earliest = releases.shift()!;
63+
const effective = Math.max(t, earliest + windowMs);
64+
releases.push(effective);
65+
const wait = effective - t;
66+
if (wait) await delay(wait);
67+
}
68+
69+
yield item;
70+
}
71+
}
72+
73+
return generate();
74+
};
75+
76+
export default rateLimitedSource;

services/ymax-planner/test/mocks.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ export const createMockEnginePowers = (): EnginePowers => ({
105105
walletStore: {} as any,
106106
getWalletInvocationUpdate: async () => undefined,
107107
now: () => NaN,
108+
setTimeout: globalThis.setTimeout,
108109
gasEstimator: {} as any,
109110
usdcTokensByChain: {},
110111
chainNameToChainIdMap: CaipChainIds.testnet,
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import test from 'ava';
2+
3+
import rateLimitedSource from '../src/rate-limited-source.ts';
4+
5+
const makeFakeClock = () => {
6+
let clockNow = 0;
7+
8+
return {
9+
advance: (ms: number) => {
10+
clockNow += ms;
11+
},
12+
setTimeout: ((cb: () => void, ms?: number) => {
13+
clockNow += ms ?? 0;
14+
cb();
15+
return 0;
16+
}) as typeof globalThis.setTimeout,
17+
now: () => clockNow,
18+
};
19+
};
20+
21+
test('rateLimitedSource: bursts up to quota, then waits one window', async t => {
22+
const clock = makeFakeClock();
23+
const items = [0, 1, 2, 3, 4, 5, 6, 7, 8];
24+
const releases: Array<{ item: number; at: number }> = [];
25+
for await (const item of rateLimitedSource({
26+
source: items,
27+
policy: { quota: 3, windowMs: 100 },
28+
powers: clock,
29+
})) {
30+
releases.push({ item, at: clock.now() });
31+
}
32+
t.deepEqual(releases, [
33+
{ item: 0, at: 0 },
34+
{ item: 1, at: 0 },
35+
{ item: 2, at: 0 },
36+
{ item: 3, at: 100 },
37+
{ item: 4, at: 100 },
38+
{ item: 5, at: 100 },
39+
{ item: 6, at: 200 },
40+
{ item: 7, at: 200 },
41+
{ item: 8, at: 200 },
42+
]);
43+
});
44+
45+
test('rateLimitedSource: an idle period grants the next item without delay', async t => {
46+
const clock = makeFakeClock();
47+
let setTimeoutCalls = 0;
48+
const powers = {
49+
now: clock.now,
50+
setTimeout: ((cb: () => void, ms?: number) => {
51+
setTimeoutCalls += 1;
52+
return clock.setTimeout(cb, ms);
53+
}) as typeof globalThis.setTimeout,
54+
};
55+
const iter = rateLimitedSource({
56+
source: [0, 1, 2],
57+
policy: { quota: 1, windowMs: 100 },
58+
powers,
59+
});
60+
61+
t.is((await iter.next()).value, 0);
62+
t.is(clock.now(), 0);
63+
t.is(setTimeoutCalls, 0);
64+
65+
clock.advance(150);
66+
t.is((await iter.next()).value, 1);
67+
t.is(clock.now(), 150);
68+
t.is(setTimeoutCalls, 0);
69+
70+
t.is((await iter.next()).value, 2);
71+
t.is(clock.now(), 250);
72+
t.is(setTimeoutCalls, 1);
73+
});
74+
75+
test('rateLimitedSource: sliding window spaces by oldest in-window release', async t => {
76+
const clock = makeFakeClock();
77+
const iter = rateLimitedSource({
78+
source: [0, 1, 2, 3],
79+
policy: { quota: 2, windowMs: 100 },
80+
powers: clock,
81+
});
82+
83+
await iter.next();
84+
t.is(clock.now(), 0);
85+
86+
clock.advance(50);
87+
await iter.next();
88+
t.is(clock.now(), 50);
89+
90+
await iter.next();
91+
t.is(clock.now(), 100);
92+
93+
await iter.next();
94+
t.is(clock.now(), 150);
95+
});
96+
97+
test('rateLimitedSource: empty source completes without invoking setTimeout', async t => {
98+
const clock = makeFakeClock();
99+
let setTimeoutCalls = 0;
100+
const powers = {
101+
now: clock.now,
102+
setTimeout: ((cb: () => void, ms?: number) => {
103+
setTimeoutCalls += 1;
104+
return clock.setTimeout(cb, ms);
105+
}) as typeof globalThis.setTimeout,
106+
};
107+
for await (const _ of rateLimitedSource({
108+
source: [],
109+
policy: { quota: 1, windowMs: 100 },
110+
powers,
111+
})) {
112+
t.fail('should not emit');
113+
}
114+
t.is(setTimeoutCalls, 0);
115+
t.is(clock.now(), 0);
116+
});
117+
118+
test('rateLimitedSource: accepts async iterables', async t => {
119+
const clock = makeFakeClock();
120+
async function* gen() {
121+
yield 'a';
122+
yield 'b';
123+
yield 'c';
124+
}
125+
const releases: Array<{ item: string; at: number }> = [];
126+
for await (const item of rateLimitedSource({
127+
source: gen(),
128+
policy: { quota: 1, windowMs: 100 },
129+
powers: clock,
130+
})) {
131+
releases.push({ item, at: clock.now() });
132+
}
133+
t.deepEqual(releases, [
134+
{ item: 'a', at: 0 },
135+
{ item: 'b', at: 100 },
136+
{ item: 'c', at: 200 },
137+
]);
138+
});
139+
140+
test('rateLimitedSource: validates quota', t => {
141+
const clock = makeFakeClock();
142+
for (const bad of [0, -1, 1.5, NaN, Infinity]) {
143+
t.throws(
144+
() =>
145+
rateLimitedSource({
146+
source: [],
147+
policy: { quota: bad, windowMs: 100 },
148+
powers: clock,
149+
}),
150+
{ instanceOf: RangeError, message: /quota/ },
151+
`quota=${bad}`,
152+
);
153+
}
154+
});
155+
156+
test('rateLimitedSource: validates windowMs', t => {
157+
const clock = makeFakeClock();
158+
for (const bad of [0, -1, NaN, Infinity]) {
159+
t.throws(
160+
() =>
161+
rateLimitedSource({
162+
source: [],
163+
policy: { quota: 1, windowMs: bad },
164+
powers: clock,
165+
}),
166+
{ instanceOf: RangeError, message: /windowMs/ },
167+
`windowMs=${bad}`,
168+
);
169+
}
170+
});
171+
172+
test('rateLimitedSource: composes with worker pool style consumers', async t => {
173+
const clock = makeFakeClock();
174+
const items = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
175+
const starts: Array<{ item: number; at: number }> = [];
176+
for await (const item of rateLimitedSource({
177+
source: items,
178+
policy: { quota: 4, windowMs: 60 },
179+
powers: clock,
180+
})) {
181+
starts.push({ item, at: clock.now() });
182+
clock.advance(80);
183+
}
184+
// Quota=4 in 60ms, but each "task" advances 80ms — so the window is always
185+
// already empty by the time we pull again. Every item starts immediately.
186+
t.deepEqual(
187+
starts.map(s => s.at),
188+
[0, 80, 160, 240, 320, 400, 480, 560, 640, 720, 800, 880],
189+
);
190+
});

0 commit comments

Comments
 (0)