Skip to content

Commit 8050602

Browse files
committed
PHP Pool to harvest and recreate instances after a certain amount of requests
This is an alternative to #990 that was not pursued
1 parent 32c0035 commit 8050602

File tree

2 files changed

+244
-0
lines changed

2 files changed

+244
-0
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { HarvestedInstancePool } from './harvested-instance-pool';
2+
3+
describe('ResourcePool', () => {
4+
describe('runs tasks as the resources are getting destroyed and replaced', async () => {
5+
describe('"concurrently"', async () => {
6+
it('maxInstances=1, maxTasksPerInstance=1', async () => {
7+
let id = 0;
8+
const pool = await HarvestedInstancePool.create({
9+
maxInstances: 1,
10+
maxTasksPerInstance: 1,
11+
create: () => 'resource' + ++id,
12+
});
13+
const runner1 = vitest.fn();
14+
const runner2 = vitest.fn();
15+
const runner3 = vitest.fn();
16+
await Promise.all([
17+
pool.runTask(runner1),
18+
pool.runTask(runner2),
19+
pool.runTask(runner3),
20+
]);
21+
expect(runner1).toHaveBeenCalledWith('resource1');
22+
expect(runner2).toHaveBeenCalledWith('resource2');
23+
expect(runner3).toHaveBeenCalledWith('resource3');
24+
});
25+
26+
it('maxInstances=2, maxTasksPerInstance=1', async () => {
27+
let id = 0;
28+
const pool = await HarvestedInstancePool.create({
29+
maxInstances: 2,
30+
maxTasksPerInstance: 2,
31+
create: () => 'resource' + ++id,
32+
});
33+
const runner1 = vitest.fn();
34+
const runner2 = vitest.fn();
35+
const runner3 = vitest.fn();
36+
const runner4 = vitest.fn();
37+
const runner5 = vitest.fn();
38+
await Promise.all([
39+
pool.runTask(runner1),
40+
pool.runTask(runner2),
41+
pool.runTask(runner3),
42+
pool.runTask(runner4),
43+
pool.runTask(runner5),
44+
]);
45+
expect(runner1).toHaveBeenCalledWith('resource1');
46+
expect(runner2).toHaveBeenCalledWith('resource2');
47+
expect(runner3).toHaveBeenCalledWith('resource1');
48+
expect(runner4).toHaveBeenCalledWith('resource2');
49+
expect(runner5).toHaveBeenCalledWith('resource3');
50+
});
51+
});
52+
describe('serially', async () => {
53+
it('maxInstances=1, maxTasksPerInstance=1', async () => {
54+
let id = 0;
55+
const pool = await HarvestedInstancePool.create({
56+
maxInstances: 1,
57+
maxTasksPerInstance: 1,
58+
create: () => 'resource' + ++id,
59+
});
60+
const runner1 = vitest.fn();
61+
const runner2 = vitest.fn();
62+
const runner3 = vitest.fn();
63+
await pool.runTask(runner1);
64+
await pool.runTask(runner2);
65+
await pool.runTask(runner3);
66+
expect(runner1).toHaveBeenCalledWith('resource1');
67+
expect(runner2).toHaveBeenCalledWith('resource2');
68+
expect(runner3).toHaveBeenCalledWith('resource3');
69+
});
70+
71+
it('maxInstances=2, maxTasksPerInstance=1', async () => {
72+
let id = 0;
73+
const pool = await HarvestedInstancePool.create({
74+
maxInstances: 2,
75+
maxTasksPerInstance: 2,
76+
create: () => 'resource' + ++id,
77+
});
78+
const runner1 = vitest.fn();
79+
const runner2 = vitest.fn();
80+
const runner3 = vitest.fn();
81+
const runner4 = vitest.fn();
82+
const runner5 = vitest.fn();
83+
await pool.runTask(runner1);
84+
await pool.runTask(runner2);
85+
await pool.runTask(runner3);
86+
await pool.runTask(runner4);
87+
await pool.runTask(runner5);
88+
expect(runner1).toHaveBeenCalledWith('resource1');
89+
expect(runner2).toHaveBeenCalledWith('resource2');
90+
expect(runner3).toHaveBeenCalledWith('resource1');
91+
// At this point, resource3 is the least utilized one
92+
// with 0 handled tasks – therefore it will be the
93+
// first one to be used.
94+
expect(runner4).toHaveBeenCalledWith('resource3');
95+
expect(runner5).toHaveBeenCalledWith('resource2');
96+
});
97+
});
98+
});
99+
100+
it('should destroy resources that have exceeded their tasks limit', async () => {
101+
const destroy = vitest.fn();
102+
const pool = await HarvestedInstancePool.create({
103+
maxInstances: 1,
104+
maxTasksPerInstance: 1,
105+
create: () => 'resource',
106+
destroy,
107+
});
108+
await pool.runTask(() => {});
109+
await pool.runTask(() => {});
110+
111+
expect(destroy).toHaveBeenCalledWith('resource');
112+
});
113+
});
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Tracks stats of instances in a pool.
3+
* @private
4+
*/
5+
class HarvestedInstance<InstanceType> {
6+
/** The underlying resource. */
7+
instance: InstanceType;
8+
/** Total tasks processed. */
9+
handledTasks = 0;
10+
/** Whether instance is busy handling a request at the moment. */
11+
busy = false;
12+
constructor(instance: InstanceType) {
13+
this.instance = instance;
14+
}
15+
}
16+
17+
type InstanceLock<Instance> = {
18+
instance: HarvestedInstance<Instance>;
19+
release: () => Promise<void>;
20+
};
21+
22+
interface PoolOptions<Instance> {
23+
create: () => Promise<Instance> | Instance;
24+
destroy?: (instance: Instance) => Promise<void> | void;
25+
maxInstances: number;
26+
maxTasksPerInstance: number;
27+
}
28+
29+
/**
30+
* Maintains and refreshes a list of php instances such that each one will only
31+
* be fed X number of tasks before being discarded and replaced.
32+
*
33+
* Since we're dealing with a linear, "physical" memory array, as opposed to a
34+
* virtual memory system afforded by most modern OSes, we're prone to things
35+
* like memory fragmentation. In that situation, we could have the entire
36+
* gigabyte empty except for a few sparse allocations. If no contiguous region
37+
* of memory exists for the length requested, memory allocations will fail.
38+
* This tends to happen when a new request attempts to initialize a heap
39+
* structure but cannot find a contiguous 2mb chunk of memory.
40+
*
41+
* We can go as far as debugging PHP itself, and contributing the fix upstream.
42+
* But even in this case we cannot guarantee that a third party extension will
43+
* not introduce a leak sometime in the future. Therefore, we should have a
44+
* solution robust to memory leaks that come from upstream code. I think that
45+
* following the native strategy is the best way.
46+
*
47+
* https://www.php.net/manual/en/install.fpm.configuration.php#pm.max-tasks
48+
*/
49+
export class HarvestedInstancePool<InstanceType> {
50+
private pool: Set<HarvestedInstance<InstanceType>> = new Set();
51+
52+
/**
53+
* A queue of promises waiting to be resolved with an idle resource.
54+
*/
55+
private queue: Array<(lock: InstanceLock<InstanceType>) => any> = [];
56+
57+
public static async create<InstanceType>(
58+
options: PoolOptions<InstanceType>
59+
) {
60+
const pool = new HarvestedInstancePool<InstanceType>(options);
61+
await pool.populate();
62+
return pool;
63+
}
64+
private constructor(private options: PoolOptions<InstanceType>) {}
65+
66+
/**
67+
* Queue up a callback that will make a request when an
68+
* instance becomes idle.
69+
* @param item Callback to run when intance becomes available. Should accept the instance as the first and only param, and return a promise that resolves when the request is complete.
70+
* @public
71+
*/
72+
async runTask(callback: (php: InstanceType) => any) {
73+
// Wait for the next available instance.
74+
const { instance, release } = await new Promise<
75+
InstanceLock<InstanceType>
76+
>((resolve) => {
77+
this.queue.push(resolve);
78+
this.startNextTask();
79+
});
80+
81+
try {
82+
return await callback(instance.instance);
83+
} finally {
84+
await release();
85+
this.startNextTask();
86+
}
87+
}
88+
89+
private startNextTask() {
90+
if (this.queue.length === 0) {
91+
return;
92+
}
93+
const lock = this.lockIdleInstance();
94+
if (lock) {
95+
this.queue.shift()!(lock);
96+
}
97+
}
98+
99+
private lockIdleInstance() {
100+
// Find the first available idle instance.
101+
const instance = Array.from(this.pool)
102+
.filter((instance) => !instance.busy)
103+
.sort((a, b) => a.handledTasks - b.handledTasks)[0];
104+
105+
if (!instance) {
106+
// Bale out if all the instances are busy.
107+
// runTask() will call this method again when an instance becomes idle.
108+
return;
109+
}
110+
111+
instance.busy = true;
112+
const release = async () => {
113+
// Destroy the instance if it has exceeded its max tasks
114+
if (++instance.handledTasks >= this.options.maxTasksPerInstance) {
115+
await this.options.destroy?.(instance.instance);
116+
this.pool.delete(instance);
117+
await this.populate();
118+
}
119+
instance.busy = false;
120+
};
121+
return { instance, release };
122+
}
123+
124+
private async populate() {
125+
// Create new instances if we don't have enough.
126+
while (this.pool.size < this.options.maxInstances) {
127+
const instance = await this.options.create();
128+
this.pool.add(new HarvestedInstance(instance));
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)