Skip to content

feat(ingestion): Kafka (Azure Event Hubs) event transport behind KAFKA_BROKERS#337

Open
ayushjhanwar-png wants to merge 4 commits into
mainfrom
feat/kafka-event-hubs-ingestion
Open

feat(ingestion): Kafka (Azure Event Hubs) event transport behind KAFKA_BROKERS#337
ayushjhanwar-png wants to merge 4 commits into
mainfrom
feat/kafka-event-hubs-ingestion

Conversation

@ayushjhanwar-png

@ayushjhanwar-png ayushjhanwar-png commented Jun 25, 2026

Copy link
Copy Markdown

What & why

Adds an opt-in Kafka transport for the API→worker event handoff, alongside the existing GroupMQ/Redis path. Targets Azure Event Hubs (managed, Kafka-protocol). Goal: kill the GroupMQ stage/promoter/ORDERING_DELAY_MS silent-drift bug class and the unbounded Redis job: backlog (RAM → broker disk), with automatic consumer-group rebalancing.

Scope is narrow — only the transport changes. Everything downstream of incomingEvent() (session logic, Redis buffers, cron flush, ClickHouse, dashboard) is untouched.

API (PRODUCER) ──emit──► Event Hubs ──read──► Worker (CONSUMER) ──► incomingEvent() ──► [ UNCHANGED: buffer → cron → ClickHouse ]
   /track,/event          keyed by deviceId

The switch

  • KAFKA_BROKERS unset → byte-identical to today (GroupMQ). Zero risk until enabled.
  • KAFKA_BROKERS set → API produces to Kafka; worker skips GroupMQ shards and runs the Kafka consumer. Global on/off (matches upstream).

Files

New (Kafka feature):

  • packages/queue/src/kafka.ts — client: idempotent producer (fail-fast timeouts, fatal-PID recovery), consumer factory, env-gated SASL/SSL.
  • apps/worker/src/jobs/events.kafka-consumer.ts — batch grouped by key (serial per device, parallel across), strict ascending offset resolution, rebalance/crash logging, duplicate detector.
  • packages/db/scripts/find-duplicate-events.ts — CH diagnostic to find/clean at-least-once duplicate rows.

Wiring: track.controller.ts, event.controller.ts (producer branch), boot-workers.ts (skip GroupMQ + start consumer via handlesEvents gate so cron/import pods never double-consume), metrics.ts (kafka_reprocessed_total), graceful-shutdown.ts (disconnect producer on API rollout so in-flight produces flush), queue index.ts/package.json, .env.example.

Incidental (separate commit): 5 SDK package.json stale-pin fixes (workspace:1.x-localworkspace:*) — pre-existing, required to unblock pnpm install.

Config (set in shared configmap; password in secret)

KAFKA_BROKERS=<ns>.servicebus.windows.net:9093
KAFKA_EVENTS_TOPIC=events
KAFKA_CONSUMER_GROUP=$Default
KAFKA_SSL=true
KAFKA_SASL_MECHANISM=plain
KAFKA_SASL_USERNAME=$ConnectionString
KAFKA_SASL_PASSWORD=<connection string>
KAFKA_CONNECTION_TIMEOUT_MS=10000   # 2s default too short for Event Hubs TLS+SASL

Testing

Validated end-to-end against real Azure Event Hubs (reels-dev) + dev infra:

  • ✅ SASL/SSL auth + idempotent producer works on Event Hubs (the key compatibility risk)
  • ✅ Real worker consumer joins $Default group, skips GroupMQ
  • ✅ Injected events flowed Event Hubs → worker → dev Redis buffer → dev ClickHouse (rows verified)

Rollout (prod)

  1. Provision prod Event Hubs (Standard 20-TU or Premium), hub with 32 partitions, 3-day retention.
  2. Deploy this with KAFKA_BROKERS unset (no behavior change) to de-risk the code deploy.
  3. Low-traffic window: drain GroupMQ to ~0, set KAFKA_*, rollout restart api + worker + worker-cron + worker-import.
  4. Watch: consumer lag (flat), CH max(created_at) freshness, kafka_reprocessed_total=0, EH throttling, Redis memory drop.
  5. Rollback: empty KAFKA_BROKERS + restart → instant GroupMQ fallback. No data loss (both transports persist).

Note on CI

origin/main already fails @openpanel/db typecheck (26 pre-existing errors in db/trpc, unrelated to this PR). The Kafka code itself typechecks clean. Pushed with SKIP_HOOKS=1 for that reason.

Summary by CodeRabbit

  • New Features

    • Added Kafka-based event ingestion support, including configuration options for brokers, topic, consumer group, and optional security settings.
    • Introduced a utility to find and optionally remove duplicate event records.
  • Bug Fixes

    • Improved event processing reliability with safer queue handling, partition-based ordering, and better shutdown behavior.
    • Added monitoring for Kafka redeliveries to help detect repeated message processing.

The SDK sub-packages pinned exact local versions (workspace:1.0.3-local /
workspace:1.1.0-local) that no longer exist after web/sdk version bumps, which
made a fresh `pnpm install` fail. Switch to `workspace:*` (the publish-safe
form pnpm rewrites to the resolved version at pack time).
…KAFKA_BROKERS

Adds an opt-in Kafka path for the API->worker event handoff, alongside the
existing GroupMQ/Redis transport. Ported from upstream (cd3bef8 + hardening
fixes 076f574, bedb92d, 97d335d) and adapted to this fork.

When KAFKA_BROKERS is set:
 - API (/track, /event) produces events to Kafka instead of GroupMQ, keyed by
   groupId (deviceId / projectId:profileId) so per-device ordering is preserved.
 - Worker skips the GroupMQ event shards and runs a Kafka consumer that calls
   the unchanged incomingEvent() handler. Everything downstream (buffers, cron
   flush, ClickHouse) is untouched.
When KAFKA_BROKERS is unset, behaviour is byte-identical to today (GroupMQ).

Hardening included from day one:
 - idempotent producer, maxInFlightRequests=1, fatal-PID recovery
 - fail-fast producer timeouts (req 5s, conn-tunable) so a broker outage can't
   park /track requests and saturate the LB
 - consumer: batch grouped by key (serial per device, parallel across), strict
   ascending offset resolution (no duplicate-causing rewind), rebalance/crash
   logging, kafka_reprocessed_total duplicate detector
 - SASL/SSL support (env-gated) for managed brokers like Azure Event Hubs
 - graceful-shutdown disconnects the producer so an API rollout flushes in-flight
 - find-duplicate-events.ts script to detect/clean at-least-once duplicates

The consumer only starts on event-handling pods (handlesEvents gate), so
cron/import worker deployments never double-consume when KAFKA_BROKERS is in a
shared configmap.

Validated end-to-end against real Azure Event Hubs + dev infra (auth, idempotent
producer, consumer group join, event -> ClickHouse).
@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds Kafka as an optional event ingestion transport alongside the existing GroupMQ shards. The packages/queue package gains a full Kafka client layer (kafka.ts). API controllers conditionally route incoming events to Kafka or the existing queue. The worker can start a Kafka consumer instead of GroupMQ shard workers. Graceful shutdown disconnects Kafka. A new ClickHouse script finds and optionally deletes duplicate events.

Changes

Kafka Event Ingestion Transport

Layer / File(s) Summary
Kafka client library
packages/queue/src/kafka.ts, packages/queue/index.ts, packages/queue/package.json
Defines all Kafka configuration constants from env vars, singleton Kafka client and idempotent producer, SASL/TLS support, fatal error detection with producer reset, produceIncomingEvent, createKafkaEventsConsumer, and disconnectKafka; re-exported from the package entrypoint with kafkajs added as a dependency.
API controller conditional routing
apps/api/src/controllers/event.controller.ts, apps/api/src/controllers/track.controller.ts
Both controllers now build a typed EventsQueuePayloadIncomingEvent['payload'] object, compute a partitionKey (falling back to generateId()), and conditionally call produceIncomingEvent when shouldUseKafka() is true, or enqueue to the GroupMQ shard otherwise.
API graceful shutdown
apps/api/src/utils/graceful-shutdown.ts
Imports disconnectKafka and adds a shutdown step that awaits Kafka producer/consumer disconnect with success/error logging.
Worker Kafka consumer
apps/worker/src/jobs/events.kafka-consumer.ts, apps/worker/src/metrics.ts
Implements startKafkaEventsConsumer with lifecycle logging, per-partition-key HWM duplicate detection, concurrent batch processing grouped by key, incomingEvent invocation, periodic heartbeats, strict ascending manual offset resolution, and a stop() handle. Adds kafkaReprocessedTotal Prometheus counter.
Worker boot wiring
apps/worker/src/boot-workers.ts
bootWorkers() computes handlesEvents, skips GroupMQ shard startup when isKafkaConfigured() is true, starts the Kafka consumer asynchronously, registers extraStops shutdown callbacks, and concurrently closes workers and extra stops in exitHandler.
ClickHouse duplicate-events script
packages/db/scripts/find-duplicate-events.ts, packages/db/package.json
New CLI script with --since, --bucket, --project, --limit, --batch, and --danger-yes-delete flags; reports duplicate event counts bucketed by day/hour using a content hash (DEDUP_KEY); deletion mode uses row_number() over the hash to select extra ids and submits batched ALTER TABLE ... DELETE mutations.
Env config and SDK version relaxation
.env.example, packages/sdks/*/package.json
Documents KAFKA_BROKERS, KAFKA_EVENTS_TOPIC, KAFKA_CONSUMER_GROUP, and optional SSL/SASL vars in .env.example; adds CLICKHOUSE_QUERY_LIMIT stub; relaxes five SDK package workspace dependency versions from pinned local versions to workspace:*.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant APIController as event/track Controller
  participant KafkaProducer as produceIncomingEvent
  participant GroupMQ as getEventsGroupQueueShard
  participant KafkaConsumer as startKafkaEventsConsumer
  participant incomingEvent

  Client->>APIController: POST event/track
  APIController->>APIController: build queueData + partitionKey
  alt shouldUseKafka()
    APIController->>KafkaProducer: produceIncomingEvent(queueData, partitionKey)
    KafkaProducer-->>APIController: ack
    KafkaConsumer->>KafkaConsumer: eachBatch – group by key, resolve offsets
    KafkaConsumer->>incomingEvent: incomingEvent(payload)
  else GroupMQ path
    APIController->>GroupMQ: getEventsGroupQueueShard(partitionKey).add(queueData)
    GroupMQ-->>APIController: queued
    GroupMQ->>incomingEvent: process job
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐇 Hoppity-hop through the broker lane,
Messages keyed with a partition refrain.
Fatal errors? Reset and retry!
Offsets resolved so no dupes sneak by.
The rabbit rejoices — Kafka's alive! 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 26.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: an opt-in Kafka event transport behind KAFKA_BROKERS, including Azure Event Hubs support.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/kafka-event-hubs-ingestion

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@apps/worker/src/boot-workers.ts`:
- Around line 231-246: The Kafka consumer startup failure is being logged and
swallowed in the startKafkaEventsConsumer flow, which allows boot to continue
with no event consumer after GroupMQ workers were skipped. Update the boot path
in boot-workers.ts so the failure from startKafkaEventsConsumer propagates and
aborts startup instead of only logging in the catch, and keep the cleanup logic
in the extraStops handle path compatible with the failure case.

In `@apps/worker/src/jobs/events.kafka-consumer.ts`:
- Around line 162-178: The Kafka consumer in incomingEvent handling is
swallowing handler exceptions and still marking the message processed, which can
cause lost events. In events.kafka-consumer.ts, update the try/catch around
incomingEvent(payload) so failed handlers are not acknowledged: either rethrow
to fail the batch before processed.add(m.offset) and the offset resolution path,
or send the payload through a DLQ and only then resolve it. Keep the
logger.error call in the handler path, but ensure the flow in the batch
processing logic does not advance offsets for failed records.

In `@packages/db/scripts/find-duplicate-events.ts`:
- Line 26: The documented default for the --batch option is out of sync with
parseArgs in find-duplicate-events.ts. Update the help text near the CLI usage
comment to match the actual default used by parseArgs() so the --batch default
is consistent everywhere; keep the documented value aligned with the
implementation’s current default.
- Around line 78-91: `parseArgs()` currently accepts non-integer `--limit` and
`--batch` values, which can later produce invalid SQL in the duplicate-events
script. Update `parseArgs()` in `find-duplicate-events.ts` to validate both
values before returning the `Args` object: parse them as integers, reject `NaN`
or non-positive values with a clear error, and keep the existing
`bucket`/`since` handling unchanged. Make sure the checks cover the values used
by the SQL-building logic so `LIMIT` and the `id IN (...)` batch size are always
numeric and valid.
- Around line 243-258: The bucketRanges() generator is expanding the first
deletion bucket earlier than the requested since boundary by flooring to the
start of the day/hour. Update bucketRanges() and its delete-mode caller in
find-duplicate-events.ts so ranges are never widened before args.since: keep the
first bucket start at since (or clamp yielded start to since) while still
aligning only subsequent buckets to the bucket size. Ensure report mode can
continue using the exact args.since boundary and delete mode only affects events
within the requested window.

In `@packages/queue/src/kafka.ts`:
- Around line 216-230: The Kafka path in produceIncomingEvent currently
serializes only the payload, which drops the existing queue metadata contract
used by the GroupMQ branches. Update the Kafka message format to carry the same
metadata fields (groupId, jobId, and orderMs), either in an envelope or message
headers, and adjust the consumer side that reads from KAFKA_EVENTS_TOPIC to
extract those fields before calling incomingEvent(). Keep the change localized
around produceIncomingEvent and the Kafka worker/consumer handling so the
metadata survives end-to-end.
- Around line 261-280: disconnectKafka currently only disconnects producer when
producer is already set, so an in-flight getProducer()/p.connect() can finish
after shutdown and leave the producer connected. Update disconnectKafka and the
producer lifecycle in kafka.ts so shutdown also handles producerConnectPromise
(or the pending connect path) by awaiting/canceling any in-flight connect before
returning, then disconnecting the resulting producer if it resolves; use the
existing disconnectKafka, getProducer, producer, and producerConnectPromise
symbols to place the fix.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 473aa568-18fd-46c7-80be-e9395292126e

📥 Commits

Reviewing files that changed from the base of the PR and between e12db93 and e295413.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (17)
  • .env.example
  • apps/api/src/controllers/event.controller.ts
  • apps/api/src/controllers/track.controller.ts
  • apps/api/src/utils/graceful-shutdown.ts
  • apps/worker/src/boot-workers.ts
  • apps/worker/src/jobs/events.kafka-consumer.ts
  • apps/worker/src/metrics.ts
  • packages/db/package.json
  • packages/db/scripts/find-duplicate-events.ts
  • packages/queue/index.ts
  • packages/queue/package.json
  • packages/queue/src/kafka.ts
  • packages/sdks/astro/package.json
  • packages/sdks/express/package.json
  • packages/sdks/nextjs/package.json
  • packages/sdks/react-native/package.json
  • packages/sdks/web/package.json

Comment on lines +231 to +246
if (isKafkaConfigured() && handlesEvents) {
let handle: KafkaConsumerHandle | null = null;
const startPromise = startKafkaEventsConsumer()
.then((h) => {
handle = h;
logger.info('Started Kafka events consumer');
})
.catch((err) => {
logger.error('Failed to start Kafka events consumer', { err });
});
extraStops.push(async () => {
await startPromise.catch(() => undefined);
if (handle) {
await handle.stop();
}
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Fail boot when the Kafka consumer cannot start.

Line 238 logs and swallows startup failure after GroupMQ event workers have already been skipped on Lines 185-187. That can leave the events pod alive with no event consumer and no fallback path.

Proposed fix
   if (isKafkaConfigured() && handlesEvents) {
-    let handle: KafkaConsumerHandle | null = null;
-    const startPromise = startKafkaEventsConsumer()
-      .then((h) => {
-        handle = h;
-        logger.info('Started Kafka events consumer');
-      })
-      .catch((err) => {
-        logger.error('Failed to start Kafka events consumer', { err });
-      });
-    extraStops.push(async () => {
-      await startPromise.catch(() => undefined);
-      if (handle) {
-        await handle.stop();
-      }
-    });
+    try {
+      const handle = await startKafkaEventsConsumer();
+      logger.info('Started Kafka events consumer');
+      extraStops.push(() => handle.stop());
+    } catch (err) {
+      logger.error('Failed to start Kafka events consumer', { err });
+      throw err;
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (isKafkaConfigured() && handlesEvents) {
let handle: KafkaConsumerHandle | null = null;
const startPromise = startKafkaEventsConsumer()
.then((h) => {
handle = h;
logger.info('Started Kafka events consumer');
})
.catch((err) => {
logger.error('Failed to start Kafka events consumer', { err });
});
extraStops.push(async () => {
await startPromise.catch(() => undefined);
if (handle) {
await handle.stop();
}
});
if (isKafkaConfigured() && handlesEvents) {
try {
const handle = await startKafkaEventsConsumer();
logger.info('Started Kafka events consumer');
extraStops.push(() => handle.stop());
} catch (err) {
logger.error('Failed to start Kafka events consumer', { err });
throw err;
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/worker/src/boot-workers.ts` around lines 231 - 246, The Kafka consumer
startup failure is being logged and swallowed in the startKafkaEventsConsumer
flow, which allows boot to continue with no event consumer after GroupMQ workers
were skipped. Update the boot path in boot-workers.ts so the failure from
startKafkaEventsConsumer propagates and aborts startup instead of only logging
in the catch, and keep the cleanup logic in the extraStops handle path
compatible with the failure case.

Comment on lines +162 to +178
try {
await incomingEvent(payload);
} catch (err) {
// Match the GroupMQ behaviour: log and ack. At-most-once on
// handler exceptions; failures here would otherwise block the
// partition.
logger.error('kafka incomingEvent handler failed', {
error: err,
partition: batch.partition,
offset: m.offset,
projectId: payload.projectId,
});
}
}
}

processed.add(m.offset);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Do not acknowledge handler failures.

Line 164 catches incomingEvent() failures, then Line 178 marks the message processed and Lines 191-195 can resolve the offset. That drops events permanently on transient downstream failures; let the batch fail or route the payload to a DLQ before resolving.

Proposed fix
                 try {
                   await incomingEvent(payload);
                 } catch (err) {
-                  // Match the GroupMQ behaviour: log and ack. At-most-once on
-                  // handler exceptions; failures here would otherwise block the
-                  // partition.
                   logger.error('kafka incomingEvent handler failed', {
                     error: err,
                     partition: batch.partition,
                     offset: m.offset,
                     projectId: payload.projectId,
                   });
+                  throw err;
                 }

Also applies to: 191-195

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/worker/src/jobs/events.kafka-consumer.ts` around lines 162 - 178, The
Kafka consumer in incomingEvent handling is swallowing handler exceptions and
still marking the message processed, which can cause lost events. In
events.kafka-consumer.ts, update the try/catch around incomingEvent(payload) so
failed handlers are not acknowledged: either rethrow to fail the batch before
processed.add(m.offset) and the offset resolution path, or send the payload
through a DLQ and only then resolve it. Keep the logger.error call in the
handler path, but ensure the flow in the batch processing logic does not advance
offsets for failed records.

* --bucket=<day|hour> Reporting granularity (default: day)
* --project=<projectId> Restrict to a single project (default: all)
* --limit=<n> Max example duplicate groups to print (default: 20)
* --batch=<n> Ids per DELETE statement when deleting (default: 10000)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win

Sync the documented --batch default with the implementation.

Line 26 says 10000, but parseArgs() defaults to 5000 on Line 90.

Proposed fix
- *   --batch=<n>            Ids per DELETE statement when deleting (default: 10000)
+ *   --batch=<n>            Ids per DELETE statement when deleting (default: 5000)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
* --batch=<n> Ids per DELETE statement when deleting (default: 10000)
* --batch=<n> Ids per DELETE statement when deleting (default: 5000)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/scripts/find-duplicate-events.ts` at line 26, The documented
default for the --batch option is out of sync with parseArgs in
find-duplicate-events.ts. Update the help text near the CLI usage comment to
match the actual default used by parseArgs() so the --batch default is
consistent everywhere; keep the documented value aligned with the
implementation’s current default.

Comment on lines +78 to +91
function parseArgs(): Args {
const bucket = (getArg('bucket') ?? 'day') as Bucket;
if (bucket !== 'day' && bucket !== 'hour') {
throw new Error(`Invalid --bucket="${bucket}". Use day or hour.`);
}
return {
since: parseSince(getArg('since') ?? '24h'),
bucket,
project: getArg('project') ?? null,
limit: Number.parseInt(getArg('limit') ?? '20', 10),
// Ids are inlined into `id IN (...)`. ~39 bytes/uuid, so 5000 ≈ 195KB stays
// safely under ClickHouse's default max_query_size (256KB).
batch: Number.parseInt(getArg('batch') ?? '5000', 10),
delete: hasFlag('danger-yes-delete'),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win

Validate numeric CLI arguments before building SQL.

--limit=abc emits LIMIT NaN, and invalid --batch values can build malformed id IN () delete statements. Reject non-integer values up front.

Proposed fix
+function parseIntegerArg(name: string, fallback: string, min: number): number {
+  const raw = getArg(name) ?? fallback;
+  if (!/^\d+$/.test(raw)) {
+    throw new Error(`Invalid --${name}="${raw}". Use an integer >= ${min}.`);
+  }
+  const value = Number.parseInt(raw, 10);
+  if (!Number.isSafeInteger(value) || value < min) {
+    throw new Error(`Invalid --${name}="${raw}". Use an integer >= ${min}.`);
+  }
+  return value;
+}
+
 function parseArgs(): Args {
   const bucket = (getArg('bucket') ?? 'day') as Bucket;
   if (bucket !== 'day' && bucket !== 'hour') {
     throw new Error(`Invalid --bucket="${bucket}". Use day or hour.`);
   }
   return {
     since: parseSince(getArg('since') ?? '24h'),
     bucket,
     project: getArg('project') ?? null,
-    limit: Number.parseInt(getArg('limit') ?? '20', 10),
+    limit: parseIntegerArg('limit', '20', 0),
     // Ids are inlined into `id IN (...)`. ~39 bytes/uuid, so 5000 ≈ 195KB stays
     // safely under ClickHouse's default max_query_size (256KB).
-    batch: Number.parseInt(getArg('batch') ?? '5000', 10),
+    batch: parseIntegerArg('batch', '5000', 1),
     delete: hasFlag('danger-yes-delete'),
   };
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function parseArgs(): Args {
const bucket = (getArg('bucket') ?? 'day') as Bucket;
if (bucket !== 'day' && bucket !== 'hour') {
throw new Error(`Invalid --bucket="${bucket}". Use day or hour.`);
}
return {
since: parseSince(getArg('since') ?? '24h'),
bucket,
project: getArg('project') ?? null,
limit: Number.parseInt(getArg('limit') ?? '20', 10),
// Ids are inlined into `id IN (...)`. ~39 bytes/uuid, so 5000 ≈ 195KB stays
// safely under ClickHouse's default max_query_size (256KB).
batch: Number.parseInt(getArg('batch') ?? '5000', 10),
delete: hasFlag('danger-yes-delete'),
function parseIntegerArg(name: string, fallback: string, min: number): number {
const raw = getArg(name) ?? fallback;
if (!/^\d+$/.test(raw)) {
throw new Error(`Invalid --${name}="${raw}". Use an integer >= ${min}.`);
}
const value = Number.parseInt(raw, 10);
if (!Number.isSafeInteger(value) || value < min) {
throw new Error(`Invalid --${name}="${raw}". Use an integer >= ${min}.`);
}
return value;
}
function parseArgs(): Args {
const bucket = (getArg('bucket') ?? 'day') as Bucket;
if (bucket !== 'day' && bucket !== 'hour') {
throw new Error(`Invalid --bucket="${bucket}". Use day or hour.`);
}
return {
since: parseSince(getArg('since') ?? '24h'),
bucket,
project: getArg('project') ?? null,
limit: parseIntegerArg('limit', '20', 0),
// Ids are inlined into `id IN (...)`. ~39 bytes/uuid, so 5000 ≈ 195KB stays
// safely under ClickHouse's default max_query_size (256KB).
batch: parseIntegerArg('batch', '5000', 1),
delete: hasFlag('danger-yes-delete'),
};
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/scripts/find-duplicate-events.ts` around lines 78 - 91,
`parseArgs()` currently accepts non-integer `--limit` and `--batch` values,
which can later produce invalid SQL in the duplicate-events script. Update
`parseArgs()` in `find-duplicate-events.ts` to validate both values before
returning the `Args` object: parse them as integers, reject `NaN` or
non-positive values with a clear error, and keep the existing `bucket`/`since`
handling unchanged. Make sure the checks cover the values used by the
SQL-building logic so `LIMIT` and the `id IN (...)` batch size are always
numeric and valid.

Comment on lines +243 to +258
function* bucketRanges(
since: Date,
bucket: Bucket
): Generator<{ start: Date; end: Date }> {
const stepMs = bucket === 'day' ? 86_400_000 : 3600_000;
// Align the first bucket to the start of the day/hour.
const start = new Date(since);
if (bucket === 'day') {
start.setUTCHours(0, 0, 0, 0);
} else {
start.setUTCMinutes(0, 0, 0);
}
const now = Date.now();
for (let t = start.getTime(); t < now; t += stepMs) {
yield { start: new Date(t), end: new Date(t + stepMs) };
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Don’t expand delete ranges before --since.

bucketRanges() floors the first range to the start of the day/hour, while report mode uses the exact args.since. In delete mode, --since=2026-06-01T12:00:00Z --bucket=day can delete duplicates from 2026-06-01 00:00:00 through noon, outside the requested window.

Proposed fix
 function* bucketRanges(
   since: Date,
   bucket: Bucket
 ): Generator<{ start: Date; end: Date }> {
   const stepMs = bucket === 'day' ? 86_400_000 : 3600_000;
   // Align the first bucket to the start of the day/hour.
-  const start = new Date(since);
+  const start = new Date(since);
   if (bucket === 'day') {
     start.setUTCHours(0, 0, 0, 0);
   } else {
     start.setUTCMinutes(0, 0, 0);
   }
   const now = Date.now();
   for (let t = start.getTime(); t < now; t += stepMs) {
-    yield { start: new Date(t), end: new Date(t + stepMs) };
+    const startMs = Math.max(t, since.getTime());
+    const endMs = Math.min(t + stepMs, now);
+    if (startMs < endMs) {
+      yield { start: new Date(startMs), end: new Date(endMs) };
+    }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function* bucketRanges(
since: Date,
bucket: Bucket
): Generator<{ start: Date; end: Date }> {
const stepMs = bucket === 'day' ? 86_400_000 : 3600_000;
// Align the first bucket to the start of the day/hour.
const start = new Date(since);
if (bucket === 'day') {
start.setUTCHours(0, 0, 0, 0);
} else {
start.setUTCMinutes(0, 0, 0);
}
const now = Date.now();
for (let t = start.getTime(); t < now; t += stepMs) {
yield { start: new Date(t), end: new Date(t + stepMs) };
}
function* bucketRanges(
since: Date,
bucket: Bucket
): Generator<{ start: Date; end: Date }> {
const stepMs = bucket === 'day' ? 86_400_000 : 3600_000;
// Align the first bucket to the start of the day/hour.
const start = new Date(since);
if (bucket === 'day') {
start.setUTCHours(0, 0, 0, 0);
} else {
start.setUTCMinutes(0, 0, 0);
}
const now = Date.now();
for (let t = start.getTime(); t < now; t += stepMs) {
const startMs = Math.max(t, since.getTime());
const endMs = Math.min(t + stepMs, now);
if (startMs < endMs) {
yield { start: new Date(startMs), end: new Date(endMs) };
}
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/db/scripts/find-duplicate-events.ts` around lines 243 - 258, The
bucketRanges() generator is expanding the first deletion bucket earlier than the
requested since boundary by flooring to the start of the day/hour. Update
bucketRanges() and its delete-mode caller in find-duplicate-events.ts so ranges
are never widened before args.since: keep the first bucket start at since (or
clamp yielded start to since) while still aligning only subsequent buckets to
the bucket size. Ensure report mode can continue using the exact args.since
boundary and delete mode only affects events within the requested window.

Comment on lines +216 to +230
export const produceIncomingEvent = async (
payload: EventsQueuePayloadIncomingEvent['payload'],
partitionKey: string,
): Promise<void> => {
const p = await getProducer();
try {
await p.send({
topic: KAFKA_EVENTS_TOPIC,
timeout: KAFKA_REQUEST_TIMEOUT_MS,
messages: [
{
key: Buffer.from(partitionKey),
value: Buffer.from(JSON.stringify(payload)),
},
],

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Preserve the existing queue metadata on the Kafka path.

The GroupMQ branches still pass groupId, jobId, and orderMs, but the Kafka message only contains payload. That drops the existing dedupe/order metadata contract for Kafka-enabled ingestion.

Consider sending an envelope or headers containing the same metadata and have the worker consume it before calling incomingEvent().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/queue/src/kafka.ts` around lines 216 - 230, The Kafka path in
produceIncomingEvent currently serializes only the payload, which drops the
existing queue metadata contract used by the GroupMQ branches. Update the Kafka
message format to carry the same metadata fields (groupId, jobId, and orderMs),
either in an envelope or message headers, and adjust the consumer side that
reads from KAFKA_EVENTS_TOPIC to extract those fields before calling
incomingEvent(). Keep the change localized around produceIncomingEvent and the
Kafka worker/consumer handling so the metadata survives end-to-end.

Comment on lines +261 to +280
export const disconnectKafka = async (): Promise<void> => {
const tasks: Promise<unknown>[] = [];
for (const c of consumers) {
tasks.push(
c.disconnect().catch((err) => {
kafkaLogger.error('kafka consumer disconnect error', { err });
}),
);
}
consumers.clear();
if (producer) {
const p = producer;
producer = null;
producerConnectPromise = null;
tasks.push(
p.disconnect().catch((err) => {
kafkaLogger.error('kafka producer disconnect error', { err });
}),
);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Disconnect an in-flight producer during shutdown.

If shutdown starts while getProducer() is still awaiting p.connect(), producer is still null, so disconnectKafka() returns without disconnecting the producer that may connect moments later.

Proposed fix
 export const disconnectKafka = async (): Promise<void> => {
   const tasks: Promise<unknown>[] = [];
   for (const c of consumers) {
     tasks.push(
       c.disconnect().catch((err) => {
         kafkaLogger.error('kafka consumer disconnect error', { err });
       }),
     );
   }
   consumers.clear();
-  if (producer) {
-    const p = producer;
-    producer = null;
-    producerConnectPromise = null;
+  const p = producer;
+  const pendingProducer = producerConnectPromise;
+  producer = null;
+  producerConnectPromise = null;
+  if (p) {
     tasks.push(
       p.disconnect().catch((err) => {
         kafkaLogger.error('kafka producer disconnect error', { err });
       }),
     );
+  } else if (pendingProducer) {
+    tasks.push(
+      pendingProducer
+        .then((connectedProducer) => connectedProducer.disconnect())
+        .catch((err) => {
+          kafkaLogger.error('kafka producer connect/disconnect error', { err });
+        }),
+    );
   }
   await Promise.all(tasks);
 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const disconnectKafka = async (): Promise<void> => {
const tasks: Promise<unknown>[] = [];
for (const c of consumers) {
tasks.push(
c.disconnect().catch((err) => {
kafkaLogger.error('kafka consumer disconnect error', { err });
}),
);
}
consumers.clear();
if (producer) {
const p = producer;
producer = null;
producerConnectPromise = null;
tasks.push(
p.disconnect().catch((err) => {
kafkaLogger.error('kafka producer disconnect error', { err });
}),
);
}
export const disconnectKafka = async (): Promise<void> => {
const tasks: Promise<unknown>[] = [];
for (const c of consumers) {
tasks.push(
c.disconnect().catch((err) => {
kafkaLogger.error('kafka consumer disconnect error', { err });
}),
);
}
consumers.clear();
const p = producer;
const pendingProducer = producerConnectPromise;
producer = null;
producerConnectPromise = null;
if (p) {
tasks.push(
p.disconnect().catch((err) => {
kafkaLogger.error('kafka producer disconnect error', { err });
}),
);
} else if (pendingProducer) {
tasks.push(
pendingProducer
.then((connectedProducer) => connectedProducer.disconnect())
.catch((err) => {
kafkaLogger.error('kafka producer connect/disconnect error', { err });
}),
);
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/queue/src/kafka.ts` around lines 261 - 280, disconnectKafka
currently only disconnects producer when producer is already set, so an
in-flight getProducer()/p.connect() can finish after shutdown and leave the
producer connected. Update disconnectKafka and the producer lifecycle in
kafka.ts so shutdown also handles producerConnectPromise (or the pending connect
path) by awaiting/canceling any in-flight connect before returning, then
disconnecting the resulting producer if it resolves; use the existing
disconnectKafka, getProducer, producer, and producerConnectPromise symbols to
place the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants