@@ -77,100 +77,61 @@ impl Operator<FetchLogInput, FetchLogOutput> for FetchLogOperator {
7777 ) ;
7878
7979 let mut log_client = self . log_client . clone ( ) ;
80- let limit_offset = log_client
80+ let mut limit_offset = log_client
8181 . scout_logs ( & self . tenant , self . collection_uuid , self . start_log_offset_id )
8282 . await
83- . ok ( ) ;
83+ . inspect_err ( |err| {
84+ tracing:: error!( "could not pull logs: {err:?}" ) ;
85+ } ) ?;
8486 let mut fetched = Vec :: new ( ) ;
8587 let timestamp = SystemTime :: now ( ) . duration_since ( UNIX_EPOCH ) ?. as_nanos ( ) as i64 ;
8688
87- if let Some ( mut limit_offset) = limit_offset {
88- tracing:: debug!(
89- "taking new code path with range [{}, {})" ,
90- self . start_log_offset_id,
91- limit_offset
89+ if let Some ( maximum_fetch_count) = self . maximum_fetch_count {
90+ limit_offset = std:: cmp:: min (
91+ limit_offset,
92+ self . start_log_offset_id + maximum_fetch_count as u64 ,
9293 ) ;
93- if let Some ( maximum_fetch_count) = self . maximum_fetch_count {
94- limit_offset = std:: cmp:: min (
95- limit_offset,
96- self . start_log_offset_id + maximum_fetch_count as u64 ,
97- ) ;
98- }
99- let window_size: usize = self . batch_size as usize ;
100- let ranges = ( self . start_log_offset_id ..limit_offset)
101- . step_by ( window_size)
102- . map ( |x| ( x, std:: cmp:: min ( x + window_size as u64 , limit_offset) ) )
103- . collect :: < Vec < _ > > ( ) ;
104- let sema = Arc :: new ( tokio:: sync:: Semaphore :: new ( 10 ) ) ;
105- let batch_readers = ranges
106- . into_iter ( )
107- . map ( |( start, limit) | {
108- let mut log_client = log_client. clone ( ) ;
109- let collection_uuid = self . collection_uuid ;
110- let num_records = ( limit - start) as i32 ;
111- let start = start as i64 ;
112- let sema = Arc :: clone ( & sema) ;
113- async move {
114- let _permit = sema. acquire ( ) . await . unwrap ( ) ;
115- log_client
116- . read (
117- & self . tenant ,
118- collection_uuid,
119- start,
120- num_records,
121- Some ( timestamp) ,
122- )
123- . await
124- }
125- } )
126- . collect :: < Vec < _ > > ( ) ;
127- let batches = futures:: future:: join_all ( batch_readers) . await ;
128- for batch in batches {
129- match batch {
130- Ok ( batch) => fetched. extend ( batch) ,
131- Err ( err) => {
132- return Err ( FetchLogError :: PullLog ( Box :: new ( err) ) ) ;
133- }
134- }
135- }
136- fetched. sort_by_key ( |f| f. log_offset ) ;
137- Ok ( Chunk :: new ( fetched. into ( ) ) )
138- } else {
139- // old behavior that we fall back to if the scout is not implemented
140- let mut offset = self . start_log_offset_id as i64 ;
141- loop {
142- let mut log_batch = log_client
143- . read (
144- & self . tenant ,
145- self . collection_uuid ,
146- offset,
147- self . batch_size as i32 ,
148- Some ( timestamp) ,
149- )
150- . await ?;
151-
152- let retrieve_count = log_batch. len ( ) ;
94+ }
15395
154- if let Some ( last_log) = log_batch. last ( ) {
155- offset = last_log. log_offset + 1 ;
156- fetched. append ( & mut log_batch) ;
157- if let Some ( limit) = self . maximum_fetch_count {
158- if fetched. len ( ) >= limit as usize {
159- // Enough logs have been fetched
160- fetched. truncate ( limit as usize ) ;
161- break ;
162- }
163- }
96+ let window_size: usize = self . batch_size as usize ;
97+ let ranges = ( self . start_log_offset_id ..limit_offset)
98+ . step_by ( window_size)
99+ . map ( |x| ( x, std:: cmp:: min ( x + window_size as u64 , limit_offset) ) )
100+ . collect :: < Vec < _ > > ( ) ;
101+ let sema = Arc :: new ( tokio:: sync:: Semaphore :: new ( 10 ) ) ;
102+ let batch_readers = ranges
103+ . into_iter ( )
104+ . map ( |( start, limit) | {
105+ let mut log_client = log_client. clone ( ) ;
106+ let collection_uuid = self . collection_uuid ;
107+ let num_records = ( limit - start) as i32 ;
108+ let start = start as i64 ;
109+ let sema = Arc :: clone ( & sema) ;
110+ async move {
111+ let _permit = sema. acquire ( ) . await . unwrap ( ) ;
112+ log_client
113+ . read (
114+ & self . tenant ,
115+ collection_uuid,
116+ start,
117+ num_records,
118+ Some ( timestamp) ,
119+ )
120+ . await
164121 }
165-
166- if retrieve_count < self . batch_size as usize {
167- // No more logs to fetch
168- break ;
122+ } )
123+ . collect :: < Vec < _ > > ( ) ;
124+ let batches = futures:: future:: join_all ( batch_readers) . await ;
125+ for batch in batches {
126+ match batch {
127+ Ok ( batch) => fetched. extend ( batch) ,
128+ Err ( err) => {
129+ return Err ( FetchLogError :: PullLog ( Box :: new ( err) ) ) ;
169130 }
170131 }
171- tracing:: info!( name: "Fetched log records" , num_records = fetched. len( ) ) ;
172- Ok ( Chunk :: new ( fetched. into ( ) ) )
173132 }
133+ fetched. sort_by_key ( |f| f. log_offset ) ;
134+ Ok ( Chunk :: new ( fetched. into ( ) ) )
174135 }
175136}
176137
0 commit comments