Skip to content

Commit f750555

Browse files
committed
fix: await processEvent before acking RabbitMQ message to restore backpressure
Unawaited func() caused channel.ack() to fire synchronously before any HTTP requests were made, making AMQP_PREFETCH_MESSAGES completely ineffective and creating unbounded concurrent connections under load.
1 parent 1fc84ab commit f750555

2 files changed

Lines changed: 11 additions & 6 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ All configuration is done through environment variables. See `.env.sample` for a
291291
| `HTTP_TIMEOUT` | No | `1000` | Webhook HTTP client timeout in milliseconds |
292292
| `HTTP_MAX_REDIRECTS` | No | `0` | Max redirects followed when dispatching webhooks |
293293
| `DB_HEALTH_CHECK_TIMEOUT` | No | `5000` | Database health check timeout in milliseconds |
294-
| `AMQP_PREFETCH_MESSAGES` | No | `10` | RabbitMQ prefetch message count |
294+
| `AMQP_PREFETCH_MESSAGES` | No | `100` | RabbitMQ prefetch message count |
295295
| `WEBHOOK_AUTO_DISABLE` | No | `false` | Auto-disable webhooks that exceed the failure threshold |
296296
| `WEBHOOK_FAILURE_THRESHOLD` | No | `90` | Failure rate percentage (0–100) above which a webhook is auto-disabled |
297297
| `WEBHOOK_HEALTH_MINUTES_WINDOW` | No | `60` | Rolling window in minutes used to compute per-webhook failure rates |
@@ -316,4 +316,4 @@ This repository contains code developed under two different ownership and licens
316316

317317
Users who require a purely MIT-licensed codebase should base their work on the `sef-mit-final` tag. The historical MIT-licensed code remains MIT and is not retroactively relicensed.
318318

319-
For details, see `LICENSE` and `NOTICE`.
319+
For details, see `LICENSE` and `NOTICE`.

src/datasources/queue/queue.provider.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export class QueueProvider implements OnApplicationShutdown {
6464
* at the same time
6565
*/
6666
getPrefetchMessages(): number {
67-
const value = Number(this.configService.get('AMQP_PREFETCH_MESSAGES', 10));
67+
const value = Number(this.configService.get('AMQP_PREFETCH_MESSAGES', 100));
6868
this.logger.log(`AMQP_PREFETCH_MESSAGES=${value}`);
6969
return value;
7070
}
@@ -149,10 +149,15 @@ export class QueueProvider implements OnApplicationShutdown {
149149
);
150150
const consumer = await channel.consume(
151151
this.getQueueName(),
152-
(message: ConsumeMessage) => {
152+
async (message: ConsumeMessage) => {
153153
if (message.content) {
154-
func(message.content.toString());
155-
channel.ack(message);
154+
try {
155+
await func(message.content.toString());
156+
} catch (error) {
157+
this.logger.error(`Error processing message: ${error.message}`);
158+
} finally {
159+
channel.ack(message);
160+
}
156161
}
157162
},
158163
{

0 commit comments

Comments
 (0)