Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions workers/main/src/configs/schedules.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { Client } from '@temporalio/client';

import { logger } from '../logger';
import { workerConfig } from './worker';

const SCHEDULE_ID = 'weekly-financial-report-schedule';

/**
* Checks if an error is a "not found" error
*/
function validateIsScheduleNotFoundError(error: unknown): boolean {
return (
(error as { code?: number }).code === 5 ||
(error instanceof Error &&
error.message.toLowerCase().includes('not found'))
);
}

/**
* Checks if schedule exists, returns true if it exists
*/
async function validateScheduleExists(client: Client): Promise<boolean> {
try {
const scheduleHandle = client.schedule.getHandle(SCHEDULE_ID);

await scheduleHandle.describe();
logger.info(`Schedule ${SCHEDULE_ID} already exists, skipping creation`);

return true;
} catch (error) {
if (!validateIsScheduleNotFoundError(error)) {
throw error;
}
logger.info(`Schedule ${SCHEDULE_ID} not found, creating new schedule`);

return false;
}
}

/**
* Creates schedule with race condition protection
*/
async function createScheduleWithRaceProtection(client: Client): Promise<void> {
try {
await client.schedule.create({
scheduleId: SCHEDULE_ID,
spec: {
cronExpressions: ['0 13 * * 2'],
timezone: 'America/New_York',
},
action: {
type: 'startWorkflow',
workflowType: 'weeklyFinancialReportsWorkflow',
taskQueue: workerConfig.taskQueue,
workflowId: `weekly-financial-report-scheduled`,
},
policies: {
overlap: 'SKIP',
catchupWindow: '1 day',
},
});

logger.info(
`Successfully created schedule ${SCHEDULE_ID} for weekly financial reports`,
);
} catch (createError) {
// Handle race condition: schedule was created by another worker
const isAlreadyExists =
(createError as { code?: number }).code === 6 ||
(createError instanceof Error &&
(createError.message.toLowerCase().includes('already exists') ||
createError.message.toLowerCase().includes('already running')));

if (isAlreadyExists) {
logger.info(
`Schedule ${SCHEDULE_ID} already exists (created by another worker), treating as success`,
);

return;
}

throw createError;
}
}

/**
* Sets up the weekly financial report schedule
* Schedule runs every Tuesday at 1 PM America/New_York time (EST/EDT)
* @param client - Temporal client instance
*/
export async function setupWeeklyReportSchedule(client: Client): Promise<void> {
try {
const isScheduleExists = await validateScheduleExists(client);

if (isScheduleExists) {
return;
}

await createScheduleWithRaceProtection(client);
} catch (error) {
logger.error(
`Failed to setup schedule ${SCHEDULE_ID}: ${error instanceof Error ? error.message : String(error)}`,
);
throw error;
}
}

/**
* Schedule configuration exported for documentation and testing
*/
export const scheduleConfig = {
scheduleId: SCHEDULE_ID,
cronExpression: '0 13 * * 2',
timezone: 'America/New_York',
description: 'Runs every Tuesday at 1 PM EST/EDT',
} as const;
16 changes: 16 additions & 0 deletions workers/main/src/configs/temporal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@ import { z } from 'zod';

const DEFAULT_TEMPORAL_ADDRESS = 'temporal:7233';

/**
* Temporal connection configuration
* Used by both workers and clients to connect to Temporal server
*/
export const temporalConfig: NativeConnectionOptions = {
address: process.env.TEMPORAL_ADDRESS || DEFAULT_TEMPORAL_ADDRESS,
};

export const temporalSchema = z.object({
TEMPORAL_ADDRESS: z.string().default(DEFAULT_TEMPORAL_ADDRESS),
});

/**
* Schedule Configuration Documentation
*
* Weekly Financial Report Schedule:
* The schedule is automatically created/verified when the worker starts.
*
* For schedule configuration details (schedule ID, cron expression, timezone, etc.),
* see the exported `scheduleConfig` object in ./schedules.ts
*
* Implementation: ./schedules.ts
*/
3 changes: 2 additions & 1 deletion workers/main/src/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

import { handleRunError, logger } from './index';
import { handleRunError } from './index';
import { logger } from './logger';

describe('handleRunError', () => {
let processExitSpy: ReturnType<typeof vi.spyOn>;
Expand Down
23 changes: 20 additions & 3 deletions workers/main/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { DefaultLogger, NativeConnection, Worker } from '@temporalio/worker';
import { Client, Connection } from '@temporalio/client';
import { NativeConnection, Worker } from '@temporalio/worker';

import * as activities from './activities';
import { validateEnv } from './common/utils';
import { setupWeeklyReportSchedule } from './configs/schedules';
import { temporalConfig } from './configs/temporal';
import { workerConfig } from './configs/worker';

export const logger = new DefaultLogger('ERROR');
import { logger } from './logger';

validateEnv();

Expand All @@ -25,6 +26,22 @@ export async function createWorker(connection: NativeConnection) {
}

export async function run(): Promise<void> {
// Setup weekly report schedule before starting worker
const clientConnection = await Connection.connect(temporalConfig);

try {
const client = new Client({ connection: clientConnection });

await setupWeeklyReportSchedule(client);
} catch (err) {
logger.error(
`Failed to setup schedule: ${err instanceof Error ? err.message : String(err)}`,
);
} finally {
await clientConnection.close();
}

// Create and run worker
const connection = await createConnection();

try {
Expand Down
8 changes: 8 additions & 0 deletions workers/main/src/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { DefaultLogger } from '@temporalio/worker';

/**
* Shared logger instance for the worker
* Using INFO level to capture important operational messages
* including schedule setup, errors, and warnings
*/
export const logger = new DefaultLogger('INFO');
Loading