Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,26 @@ API_PORT=3333
# /agent endpoints — bearer token allow-list (comma-separated, one per agent)
AGENT_API_KEYS=""

# KAFKA (event ingestion transport)
# Leave KAFKA_BROKERS empty to use the default GroupMQ/Redis path (no change).
# Set it to switch event ingestion to Kafka (producer + worker consumer).
#
# Local Redpanda (docker-compose, no auth):
# KAFKA_BROKERS="localhost:19092"
#
# Azure Event Hubs (managed, Standard+ tier; pre-create the `events` hub):
# KAFKA_BROKERS="<namespace>.servicebus.windows.net:9093"
# KAFKA_SSL="true"
# KAFKA_SASL_MECHANISM="plain"
# KAFKA_SASL_USERNAME="$ConnectionString"
# KAFKA_SASL_PASSWORD="Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=..."
KAFKA_BROKERS=""
KAFKA_EVENTS_TOPIC="events"
KAFKA_CONSUMER_GROUP="openpanel-events"
# KAFKA_SSL=""
# KAFKA_SASL_MECHANISM=""
# KAFKA_SASL_USERNAME=""
# KAFKA_SASL_PASSWORD=""
# DASHBOARD CHART CACHE
# TTL (seconds) for the server-side Redis cache of chart/funnel/conversion
# queries. Default 3600 (1h). Set to 0 to disable caching.
Expand All @@ -26,4 +46,4 @@ ENABLE_TRPC_CACHE=""
# Optional: raise ClickHouse `max_concurrent_queries_for_user` for the app's CH
# user so a dashboard's cold-load / Reload burst doesn't exceed CH's default of
# 10 ("Too many simultaneous queries"). Unset = leave CH's default untouched.
CLICKHOUSE_QUERY_LIMIT=""
CLICKHOUSE_QUERY_LIMIT=""
49 changes: 31 additions & 18 deletions apps/api/src/controllers/event.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import type { FastifyReply, FastifyRequest } from 'fastify';

import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getSalts } from '@openpanel/db';
import { getEventsGroupQueueShard } from '@openpanel/queue';
import {
type EventsQueuePayloadIncomingEvent,
getEventsGroupQueueShard,
produceIncomingEvent,
shouldUseKafka,
} from '@openpanel/queue';
import type { PostEventPayload } from '@openpanel/sdk';

import { generateId, slug } from '@openpanel/common';
Expand Down Expand Up @@ -62,24 +67,32 @@ export async function postEvent(
]
.filter(Boolean)
.join('-');
await getEventsGroupQueueShard(groupId).add({
orderMs: new Date(timestamp).getTime(),
data: {
projectId,
headers,
event: {
...request.body,
timestamp,
isTimestampFromThePast,
},
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
const queueData: EventsQueuePayloadIncomingEvent['payload'] = {
projectId,
headers,
event: {
...request.body,
timestamp,
isTimestampFromThePast,
},
groupId,
jobId,
});
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
};

const partitionKey = groupId || generateId();

if (shouldUseKafka()) {
await produceIncomingEvent(queueData, partitionKey);
} else {
await getEventsGroupQueueShard(partitionKey).add({
orderMs: new Date(timestamp).getTime(),
data: queueData,
groupId,
jobId,
});
}

reply.status(202).send('ok');
}
52 changes: 34 additions & 18 deletions apps/api/src/controllers/track.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import {
upsertProfile,
} from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { getEventsGroupQueueShard, getQueueName } from '@openpanel/queue';
import {
type EventsQueuePayloadIncomingEvent,
getEventsGroupQueueShard,
getQueueName,
produceIncomingEvent,
shouldUseKafka,
} from '@openpanel/queue';
import { getRedisCache } from '@openpanel/redis';
import type {
DecrementPayload,
Expand Down Expand Up @@ -321,24 +327,34 @@ async function track({
]
.filter(Boolean)
.join('-');
await getEventsGroupQueueShard(groupId).add({
orderMs: timestamp,
data: {
projectId,
headers,
event: {
...payload,
timestamp,
isTimestampFromThePast,
},
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
const queueData: EventsQueuePayloadIncomingEvent['payload'] = {
projectId,
headers,
event: {
...payload,
timestamp,
isTimestampFromThePast,
},
groupId,
jobId,
});
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
};

// Partition key = groupId so a device's events keep their order on one
// partition; falls back to a random id only for the (unused) empty-group case.
const partitionKey = groupId || generateId();

if (shouldUseKafka()) {
await produceIncomingEvent(queueData, partitionKey);
} else {
await getEventsGroupQueueShard(partitionKey).add({
orderMs: timestamp,
data: queueData,
groupId,
jobId,
});
}
}

async function handleReplay({
Expand Down
10 changes: 10 additions & 0 deletions apps/api/src/utils/graceful-shutdown.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ch, db } from '@openpanel/db';
import {
cronQueue,
disconnectKafka,
eventsGroupQueues,
miscQueue,
notificationQueue,
Expand Down Expand Up @@ -84,6 +85,15 @@ export async function shutdown(
logger.error('Error closing queue state', error);
}

// Step 6.5: Disconnect Kafka producer (no-op if never initialized).
// Flushes any in-flight produces so an API rollout doesn't drop events.
try {
await disconnectKafka();
logger.info('Kafka producer disconnected');
} catch (error) {
logger.error('Error disconnecting Kafka producer', error);
}

// Step 7: Close Redis connections
try {
const redisConnections = [
Expand Down
56 changes: 52 additions & 4 deletions apps/worker/src/boot-workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
cronQueue,
eventsGroupQueues,
importQueue,
isKafkaConfigured,
miscQueue,
notificationQueue,
queueLogger,
Expand All @@ -20,6 +21,10 @@ import { Worker as GroupWorker } from 'groupmq';

import { cronJob } from './jobs/cron';
import { incomingEvent } from './jobs/events.incoming-event';
import {
type KafkaConsumerHandle,
startKafkaEventsConsumer,
} from './jobs/events.kafka-consumer';
import { importJob } from './jobs/import';
import { miscJob } from './jobs/misc';
import { notificationJob } from './jobs/notification';
Expand Down Expand Up @@ -160,11 +165,26 @@ export async function bootWorkers() {
: getEnabledQueues();

const workers: (Worker | GroupWorker<any>)[] = [];

// Start event workers based on enabled queues
const extraStops: Array<() => Promise<unknown>> = [];

// Does this pod handle event ingestion at all? (the events statefulset —
// NOT the cron/import deployments, which set ENABLED_QUEUES=cron / import).
// Gates the Kafka consumer so cron/import pods don't also consume events when
// KAFKA_BROKERS is set in the shared configmap.
const handlesEvents =
enabledQueues.includes('events') ||
enabledQueues.includes('events_kafka') ||
enabledQueues.some((q) => /^events_\d+$/.test(q));

// Start event workers based on enabled queues.
// When Kafka is configured the producer (api) routes every event to Kafka, so
// the GroupMQ event shards would only poll an empty queue — skip them on the
// events pod and let the Kafka consumer (below) handle ingestion instead.
const eventQueuesToStart: number[] = [];

if (enabledQueues.includes('events')) {
if (isKafkaConfigured() && handlesEvents) {
logger.info('Kafka is configured, skipping GroupMQ event workers');
} else if (enabledQueues.includes('events')) {
// Start all event shards
for (let i = 0; i < EVENTS_GROUP_QUEUES_SHARDS; i++) {
eventQueuesToStart.push(i);
Expand Down Expand Up @@ -205,6 +225,27 @@ export async function bootWorkers() {
logger.info(`Started worker for ${queueName}`, { concurrency });
}

// Start the Kafka events consumer. When Kafka is configured this fully
// replaces the GroupMQ event workers (which are skipped above). Only the
// events pod runs it — cron/import pods (handlesEvents=false) never consume.
if (isKafkaConfigured() && handlesEvents) {
let handle: KafkaConsumerHandle | null = null;
const startPromise = startKafkaEventsConsumer()
.then((h) => {
handle = h;
logger.info('Started Kafka events consumer');
})
.catch((err) => {
logger.error('Failed to start Kafka events consumer', { err });
});
extraStops.push(async () => {
await startPromise.catch(() => undefined);
if (handle) {
await handle.stop();
}
});
Comment on lines +231 to +246

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Fail boot when the Kafka consumer cannot start.

Line 238 logs and swallows startup failure after GroupMQ event workers have already been skipped on Lines 185-187. That can leave the events pod alive with no event consumer and no fallback path.

Proposed fix
   if (isKafkaConfigured() && handlesEvents) {
-    let handle: KafkaConsumerHandle | null = null;
-    const startPromise = startKafkaEventsConsumer()
-      .then((h) => {
-        handle = h;
-        logger.info('Started Kafka events consumer');
-      })
-      .catch((err) => {
-        logger.error('Failed to start Kafka events consumer', { err });
-      });
-    extraStops.push(async () => {
-      await startPromise.catch(() => undefined);
-      if (handle) {
-        await handle.stop();
-      }
-    });
+    try {
+      const handle = await startKafkaEventsConsumer();
+      logger.info('Started Kafka events consumer');
+      extraStops.push(() => handle.stop());
+    } catch (err) {
+      logger.error('Failed to start Kafka events consumer', { err });
+      throw err;
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (isKafkaConfigured() && handlesEvents) {
let handle: KafkaConsumerHandle | null = null;
const startPromise = startKafkaEventsConsumer()
.then((h) => {
handle = h;
logger.info('Started Kafka events consumer');
})
.catch((err) => {
logger.error('Failed to start Kafka events consumer', { err });
});
extraStops.push(async () => {
await startPromise.catch(() => undefined);
if (handle) {
await handle.stop();
}
});
if (isKafkaConfigured() && handlesEvents) {
try {
const handle = await startKafkaEventsConsumer();
logger.info('Started Kafka events consumer');
extraStops.push(() => handle.stop());
} catch (err) {
logger.error('Failed to start Kafka events consumer', { err });
throw err;
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/worker/src/boot-workers.ts` around lines 231 - 246, The Kafka consumer
startup failure is being logged and swallowed in the startKafkaEventsConsumer
flow, which allows boot to continue with no event consumer after GroupMQ workers
were skipped. Update the boot path in boot-workers.ts so the failure from
startKafkaEventsConsumer propagates and aborts startup instead of only logging
in the catch, and keep the cleanup logic in the extraStops handle path
compatible with the failure case.

}

// Start sessions worker
if (enabledQueues.includes('sessions')) {
const concurrency = getConcurrencyFor('sessions');
Expand Down Expand Up @@ -352,7 +393,14 @@ export async function bootWorkers() {
await waitForQueueToEmpty(cronQueue);
}

await Promise.all(workers.map((worker) => worker.close()));
await Promise.all([
...workers.map((worker) => worker.close()),
...extraStops.map((stop) =>
stop().catch((err) => {
logger.error('extra stop handler error', { err });
}),
),
]);

logger.info('workers closed successfully', {
elapsed: performance.now() - time,
Expand Down
Loading