Skip to content

Commit cf14d67

Browse files
committed
feat(sidetrack): allow initializing sidetrack on layer creation
1 parent f27e11d commit cf14d67

File tree

4 files changed

+63
-1
lines changed

4 files changed

+63
-1
lines changed

.changeset/fresh-squids-push.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"sidetrack": patch
3+
---
4+
5+
Support initializing sidetrack on layer creation

packages/sidetrack/src/effect.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ export function layer<Queues extends SidetrackQueuesGenericType>(
504504
`INSERT INTO sidetrack_cron_jobs (queue, cron_expression, payload, timezone)
505505
VALUES ($1, $2, $3, $4)
506506
ON CONFLICT (queue, cron_expression) DO UPDATE
507-
SET payload = $3 RETURNING *`,
507+
SET payload = $3, timezone = $4 RETURNING *`,
508508
[
509509
queueName,
510510
cronExpression,
@@ -770,6 +770,10 @@ export function layer<Queues extends SidetrackQueuesGenericType>(
770770
}),
771771
);
772772

773+
if (layerOptions.startOnInitialization) {
774+
yield* start();
775+
}
776+
773777
return {
774778
cancelJob,
775779
deactivateCronSchedule,

packages/sidetrack/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ export interface SidetrackOptions<Queues extends SidetrackQueuesGenericType> {
8383
*/
8484
pollingInterval?: PollingInterval;
8585
queues: SidetrackQueues<Queues>;
86+
/**
87+
* This is useful if you want it to automatically start polling when you initialize sidetrack instead of waiting to call start.
88+
* Particularly useful if you are running sidetrack in a long running process on its own.
89+
* e.g. if you are using Layer.launch with Effect (@link https://effect.website/docs/requirements-management/layers/#converting-a-layer-to-an-effect)
90+
*/
91+
startOnInitialization?: boolean;
8692
}
8793

8894
export class SidetrackJobRunError extends Data.TaggedError(

packages/sidetrack/test/effect.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,4 +454,51 @@ describe("Effect API", () => {
454454
await Effect.runPromise(program.pipe(Effect.provide(sidetrackLayer)));
455455
});
456456
});
457+
458+
it("automatically starts job processing when startOnInitialization is true", async () => {
459+
await runInTransaction(pool, async (client) => {
460+
interface Queues {
461+
test: { id: string };
462+
}
463+
464+
const SidetrackService = SidetrackEffect.getSidetrackService<Queues>();
465+
let jobProcessed = false;
466+
467+
const sidetrackLayer = SidetrackEffect.layer<Queues>({
468+
dbClient: usePg(client),
469+
pollingInterval: 100,
470+
queues: {
471+
test: {
472+
run: (payload) => {
473+
jobProcessed = true;
474+
expect(payload.id).toBe("auto-start");
475+
return Promise.resolve(payload);
476+
},
477+
},
478+
},
479+
startOnInitialization: true,
480+
});
481+
482+
const program = Effect.gen(function* () {
483+
const sidetrack = yield* SidetrackService;
484+
485+
// Insert a job that should be automatically processed
486+
const job = yield* sidetrack.insertJob("test", { id: "auto-start" });
487+
488+
// Allow some time for the job to be processed by the automatic polling
489+
yield* Effect.sleep("300 millis");
490+
491+
// We shouldn't need to call start() or runJob() explicitly
492+
const completedJob = yield* sidetrack.getJob(job.id);
493+
494+
expect(jobProcessed).toBe(true);
495+
expect(completedJob.status).toBe("completed");
496+
497+
// Clean up by stopping the service
498+
sidetrack.stop();
499+
});
500+
501+
await Effect.runPromise(program.pipe(Effect.provide(sidetrackLayer)));
502+
});
503+
});
457504
});

0 commit comments

Comments
 (0)