1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: marker:: PhantomData ;
1516use std:: sync:: Arc ;
1617
1718use async_trait:: async_trait;
18- use common_telemetry:: info;
19+ use common_telemetry:: { debug , info} ;
1920use table:: metadata:: TableId ;
2021use tokio:: sync:: { Notify , RwLock } ;
2122use tokio:: task:: JoinHandle ;
2223use tokio_util:: sync:: CancellationToken ;
2324
2425use crate :: compaction:: dedup_deque:: DedupDeque ;
26+ use crate :: compaction:: picker:: { Picker , SimplePicker } ;
2527use crate :: compaction:: rate_limit:: {
2628 CascadeRateLimiter , RateLimitToken , RateLimitTokenPtr , RateLimiter ,
2729} ;
28- use crate :: compaction:: task:: CompactionTask ;
30+ use crate :: compaction:: task:: { CompactionTask , CompactionTaskImpl } ;
2931use crate :: error:: Result ;
3032
3133/// Table compaction request.
@@ -87,9 +89,13 @@ impl LocalCompactionScheduler {
8789 req_queue : request_queue. clone ( ) ,
8890 cancel_token : cancel_token. child_token ( ) ,
8991 limiter : Arc :: new ( CascadeRateLimiter :: new ( vec ! [ ] ) ) ,
92+ picker : SimplePicker :: new ( ) ,
93+ _phantom_data : PhantomData :: < CompactionTaskImpl > :: default ( ) ,
9094 } ;
91- let join_handle: JoinHandle < ( ) > =
92- common_runtime:: spawn_bg ( async move { handler. run ( ) . await } ) ;
95+ let join_handle = common_runtime:: spawn_bg ( async move {
96+ debug ! ( "Compaction handler loop spawned" ) ;
97+ handler. run ( ) . await ;
98+ } ) ;
9399 Self {
94100 join_handle,
95101 request_queue,
@@ -100,24 +106,27 @@ impl LocalCompactionScheduler {
100106}
101107
102108#[ allow( unused) ]
103- struct CompactionHandler {
104- req_queue : Arc < RwLock < DedupDeque < TableId , CompactionRequest > > > ,
109+ struct CompactionHandler < R , T : CompactionTask , P : Picker < R , T > > {
110+ req_queue : Arc < RwLock < DedupDeque < TableId , R > > > ,
105111 cancel_token : CancellationToken ,
106112 task_notifier : Arc < Notify > ,
107- limiter : Arc < CascadeRateLimiter < CompactionRequest > > ,
113+ limiter : Arc < CascadeRateLimiter < R > > ,
114+ picker : P ,
115+ _phantom_data : PhantomData < T > ,
108116}
109117
110118#[ allow( unused) ]
111- impl CompactionHandler {
119+ impl < R , T : CompactionTask , P : Picker < R , T > > CompactionHandler < R , T , P > {
112120 /// Runs table compaction requests dispatch loop.
113- pub async fn run ( self ) {
121+ pub async fn run ( & self ) {
114122 let task_notifier = self . task_notifier . clone ( ) ;
115123 let limiter = self . limiter . clone ( ) ;
116124 loop {
117125 tokio:: select! {
118126 _ = task_notifier. notified( ) => {
119127 // poll requests as many as possible until rate limited, and then wait for
120128 // notification (some task's finished).
129+ debug!( "Notified, task size: {:?}" , self . req_queue. read( ) . await . len( ) ) ;
121130 while let Some ( ( table_id, req) ) = self . poll_task( ) . await {
122131 if let Ok ( token) = limiter. acquire_token( & req) {
123132 self . handle_compaction_request( req, token) . await ;
@@ -138,38 +147,95 @@ impl CompactionHandler {
138147 }
139148
140149 #[ inline]
141- async fn poll_task ( & self ) -> Option < ( TableId , CompactionRequest ) > {
150+ async fn poll_task ( & self ) -> Option < ( TableId , R ) > {
142151 let mut queue = self . req_queue . write ( ) . await ;
143152 queue. pop_front ( )
144153 }
145154
146155 /// Puts request back to the front of request queue.
147156 #[ inline]
148- async fn put_back_req ( & self , table_id : TableId , req : CompactionRequest ) {
157+ async fn put_back_req ( & self , table_id : TableId , req : R ) {
149158 let mut queue = self . req_queue . write ( ) . await ;
150159 queue. push_front ( table_id, req) ;
151160 }
152161
153162 // Handles compaction request, submit task to bg runtime.
154- async fn handle_compaction_request (
155- & self ,
156- mut req : CompactionRequest ,
157- token : RateLimitTokenPtr ,
158- ) {
163+ async fn handle_compaction_request ( & self , mut req : R , token : RateLimitTokenPtr ) -> Result < ( ) > {
159164 let cloned_notify = self . task_notifier . clone ( ) ;
160- let task = self . build_compaction_task ( req) . await ;
165+ let task = self . build_compaction_task ( req) . await ? ;
161166
162167 common_runtime:: spawn_bg ( async move {
163- task. run ( ) ;
164- // releases rate limit token
168+ task. run ( ) . await ; // TODO(hl): handle errors
169+ // releases rate limit token
165170 token. try_release ( ) ;
166171 // notify scheduler to schedule next task when current task finishes.
167172 cloned_notify. notify_one ( ) ;
168173 } ) ;
174+
175+ Ok ( ( ) )
169176 }
170177
171178 // TODO(hl): generate compaction task(find SSTs to compact along with the output of compaction)
172- async fn build_compaction_task ( & self , req : CompactionRequest ) -> CompactionTask {
173- todo ! ( )
179+ async fn build_compaction_task ( & self , req : R ) -> crate :: error:: Result < T > {
180+ self . picker . pick ( & req)
181+ }
182+ }
183+
184+ #[ cfg( test) ]
185+ mod tests {
186+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
187+
188+ use tokio:: sync:: Barrier ;
189+
190+ use super :: * ;
191+ use crate :: compaction:: picker:: tests:: MockPicker ;
192+ use crate :: compaction:: rate_limit:: MaxInflightTaskLimiter ;
193+
194+ #[ tokio:: test]
195+ async fn test_schedule_handler ( ) {
196+ common_telemetry:: init_default_ut_logging ( ) ;
197+ let queue = Arc :: new ( RwLock :: new ( DedupDeque :: default ( ) ) ) ;
198+ let task_finished = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
199+
200+ let task_finished_clone = task_finished. clone ( ) ;
201+ let barrier = Arc :: new ( Barrier :: new ( 3 ) ) ;
202+ let barrier_clone = barrier. clone ( ) ;
203+ let picker = MockPicker {
204+ cbs : vec ! [ Arc :: new( move || {
205+ debug!( "Running callback" ) ;
206+ task_finished_clone. fetch_add( 1 , Ordering :: Relaxed ) ;
207+ let barrier_clone_2 = barrier_clone. clone( ) ;
208+ Box :: pin( async move {
209+ barrier_clone_2. wait( ) . await ;
210+ } )
211+ } ) ] ,
212+ } ;
213+ let handler = Arc :: new ( CompactionHandler {
214+ req_queue : queue. clone ( ) ,
215+ cancel_token : Default :: default ( ) ,
216+ task_notifier : Arc :: new ( Default :: default ( ) ) ,
217+ limiter : Arc :: new ( CascadeRateLimiter :: new ( vec ! [ Box :: new(
218+ MaxInflightTaskLimiter :: new( 3 ) ,
219+ ) ] ) ) ,
220+ picker,
221+ _phantom_data : Default :: default ( ) ,
222+ } ) ;
223+
224+ let handler_cloned = handler. clone ( ) ;
225+ common_runtime:: spawn_bg ( async move { handler_cloned. run ( ) . await } ) ;
226+
227+ queue
228+ . write ( )
229+ . await
230+ . push_back ( 1 , CompactionRequest :: default ( ) ) ;
231+ handler. task_notifier . notify_one ( ) ;
232+ queue
233+ . write ( )
234+ . await
235+ . push_back ( 2 , CompactionRequest :: default ( ) ) ;
236+ handler. task_notifier . notify_one ( ) ;
237+
238+ barrier. wait ( ) . await ;
239+ assert_eq ! ( 2 , task_finished. load( Ordering :: Relaxed ) ) ;
174240 }
175241}
0 commit comments