11import { Queue , Worker , Job , ConnectionOptions , QueueOptions , WorkerOptions } from 'bullmq' ;
2+ import { jsonifyError , Logger } from '@chimera-monorepo/utils' ;
23
34export { Queue , Worker , Job } from 'bullmq' ;
45
@@ -24,15 +25,38 @@ export const parseRedisUrl = (redisUrl: string): ConnectionOptions => {
2425 ...( url . username ? { username : url . username } : { } ) ,
2526 ...( url . protocol === 'rediss:' ? { tls : tlsServername ? { servername : tlsServername } : { } } : { } ) ,
2627 connectTimeout : 17_000 ,
27- maxRetriesPerRequest : 4 ,
28- retryStrategy : ( times : number ) => Math . min ( times * 30 , 1000 ) ,
28+ maxRetriesPerRequest : null ,
29+ retryStrategy : ( times : number ) => Math . min ( times * 50 , 5_000 ) ,
2930 keepAlive : 30_000 ,
3031 } ;
3132} ;
3233
33- export const createProducer = ( redisUrl : string , queueName : string , opts ?: Partial < QueueOptions > ) : Queue => {
34+ export const pingRedis = async ( queueOrWorker : Queue | Worker , timeoutMs = 3_000 ) : Promise < boolean > => {
35+ try {
36+ const client = await Promise . race ( [
37+ queueOrWorker . client ,
38+ new Promise < never > ( ( _ , reject ) => setTimeout ( ( ) => reject ( new Error ( 'timeout' ) ) , timeoutMs ) ) ,
39+ ] ) ;
40+ if ( client . status !== 'ready' ) return false ;
41+
42+ const result = await Promise . race ( [
43+ client . ping ( ) ,
44+ new Promise < never > ( ( _ , reject ) => setTimeout ( ( ) => reject ( new Error ( 'timeout' ) ) , timeoutMs ) ) ,
45+ ] ) ;
46+ return result === 'PONG' ;
47+ } catch {
48+ return false ;
49+ }
50+ } ;
51+
52+ export const createProducer = (
53+ redisUrl : string ,
54+ queueName : string ,
55+ logger : Logger ,
56+ opts ?: Partial < QueueOptions > ,
57+ ) : Queue => {
3458 const connection = parseRedisUrl ( redisUrl ) ;
35- return new Queue ( queueName , {
59+ const queue = new Queue ( queueName , {
3660 connection,
3761 defaultJobOptions : {
3862 attempts : 3 ,
@@ -42,19 +66,40 @@ export const createProducer = (redisUrl: string, queueName: string, opts?: Parti
4266 } ,
4367 ...opts ,
4468 } ) ;
69+ queue . on ( 'error' , ( err ) =>
70+ logger . error ( `Queue "${ queueName } " Redis connection error` , undefined , undefined , jsonifyError ( err ) , {
71+ queueName,
72+ role : 'producer' ,
73+ } ) ,
74+ ) ;
75+ return queue ;
4576} ;
4677
4778export const createWorker = (
4879 redisUrl : string ,
4980 queueName : string ,
5081 processor : ( job : Job ) => Promise < void > ,
82+ logger : Logger ,
5183 opts ?: Partial < WorkerOptions > ,
5284) : Worker => {
5385 const connection = parseRedisUrl ( redisUrl ) ;
54- return new Worker ( queueName , processor , {
86+ const worker = new Worker ( queueName , processor , {
5587 connection,
5688 concurrency : 1 ,
5789 lockDuration : 300_000 , // 5 minutes — prevents stale-lock re-processing for long tasks
5890 ...opts ,
5991 } ) ;
92+ worker . on ( 'ready' , ( ) =>
93+ logger . info ( `Worker "${ queueName } " Redis connected` , undefined , undefined , {
94+ queueName,
95+ role : 'worker' ,
96+ } ) ,
97+ ) ;
98+ worker . on ( 'error' , ( err ) =>
99+ logger . error ( `Worker "${ queueName } " Redis connection error` , undefined , undefined , jsonifyError ( err ) , {
100+ queueName,
101+ role : 'worker' ,
102+ } ) ,
103+ ) ;
104+ return worker ;
60105} ;
0 commit comments