@@ -28,6 +28,7 @@ class EventBuffer {
2828 private initialRetryDelay : number ;
2929 private shutdown : boolean = false ;
3030 private stopped : boolean = false ;
31+ private flushing : boolean = false ; // Add flush state tracking
3132
3233 constructor ( eventsApi : Events , opts ?: EventBufferOptions ) {
3334 const {
@@ -50,56 +51,61 @@ class EventBuffer {
5051 }
5152
5253 public async flush ( ) : Promise < void > {
53- if ( this . events . length === 0 ) {
54+ if ( this . events . length === 0 || this . flushing ) {
5455 return ;
5556 }
5657
57- const events = [ ...this . events ] ;
58- this . events = [ ] ;
59-
60- // Initialize retry counter and success flag
61- let retryCount = 0 ;
62- let success = false ;
63- let lastError : any = null ;
64-
65- // Try with retries and exponential backoff
66- while ( retryCount <= this . maxRetries && ! success ) {
67- try {
68- if ( retryCount > 0 ) {
69- // Log retry attempt
70- this . logger . info ( `Retrying event batch submission (attempt ${ retryCount } of ${ this . maxRetries } )` ) ;
71- }
58+ this . flushing = true ;
59+ try {
60+ const events = [ ...this . events ] ;
61+ this . events = [ ] ;
62+
63+ // Initialize retry counter and success flag
64+ let retryCount = 0 ;
65+ let success = false ;
66+ let lastError : any = null ;
67+
68+ // Try with retries and exponential backoff
69+ while ( retryCount <= this . maxRetries && ! success ) {
70+ try {
71+ if ( retryCount > 0 ) {
72+ // Log retry attempt
73+ this . logger . info ( `Retrying event batch submission (attempt ${ retryCount } of ${ this . maxRetries } )` ) ;
74+ }
7275
73- // Attempt to send events
74- await this . eventsApi . createEventBatch ( { events } ) ;
75- success = true ;
76- } catch ( err ) {
77- lastError = err ;
78- retryCount ++ ;
79-
80- if ( retryCount <= this . maxRetries ) {
81- // Calculate backoff with jitter
82- const delay = this . initialRetryDelay * Math . pow ( 2 , retryCount - 1 ) ;
83- const jitter = Math . random ( ) * 0.1 * delay ; // 10% jitter
84- const waitTime = delay + jitter ;
85-
86- this . logger . warn (
87- `Event batch submission failed: ${ err } . Retrying in ${ ( waitTime / 1000 ) . toFixed ( 2 ) } seconds...`
88- ) ;
89-
90- // Wait before retry
91- if ( process . env . NODE_ENV !== "test" ) {
92- await new Promise ( ( resolve ) => setTimeout ( resolve , waitTime ) ) ;
76+ // Attempt to send events
77+ await this . eventsApi . createEventBatch ( { events } ) ;
78+ success = true ;
79+ } catch ( err ) {
80+ lastError = err ;
81+ retryCount ++ ;
82+
83+ if ( retryCount <= this . maxRetries ) {
84+ // Calculate backoff with jitter
85+ const delay = this . initialRetryDelay * Math . pow ( 2 , retryCount - 1 ) ;
86+ const jitter = Math . random ( ) * 0.1 * delay ; // 10% jitter
87+ const waitTime = delay + jitter ;
88+
89+ this . logger . warn (
90+ `Event batch submission failed: ${ err } . Retrying in ${ ( waitTime / 1000 ) . toFixed ( 2 ) } seconds...`
91+ ) ;
92+
93+ // Wait before retry
94+ if ( process . env . NODE_ENV !== "test" ) {
95+ await new Promise ( ( resolve ) => setTimeout ( resolve , waitTime ) ) ;
96+ }
9397 }
9498 }
9599 }
96- }
97100
98- // After all retries, if still not successful, log the error
99- if ( ! success ) {
100- this . logger . error ( `Event batch submission failed after ${ this . maxRetries } retries:` , lastError ) ;
101- } else if ( retryCount > 0 ) {
102- this . logger . info ( `Event batch submission succeeded after ${ retryCount } retries` ) ;
101+ // After all retries, if still not successful, log the error
102+ if ( ! success ) {
103+ this . logger . error ( `Event batch submission failed after ${ this . maxRetries } retries:` , lastError ) ;
104+ } else if ( retryCount > 0 ) {
105+ this . logger . info ( `Event batch submission succeeded after ${ retryCount } retries` ) ;
106+ }
107+ } finally {
108+ this . flushing = false ;
103109 }
104110 }
105111
@@ -113,7 +119,7 @@ class EventBuffer {
113119 return ;
114120 }
115121
116- if ( this . events . length >= this . maxSize ) {
122+ if ( this . events . length >= this . maxSize && ! this . flushing ) {
117123 await this . flush ( ) ;
118124 }
119125
0 commit comments