@@ -215,7 +215,18 @@ impl<S: LogStore> FlushJob<S> {
215215 Ok ( metas)
216216 }
217217
218- async fn write_manifest_and_apply ( & self , file_metas : & [ FileMeta ] ) -> Result < ( ) > {
218+ /// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$`
219+ fn generate_sst_file_name ( ) -> String {
220+ format ! ( "{}.parquet" , Uuid :: new_v4( ) . hyphenated( ) )
221+ }
222+ }
223+
224+ #[ async_trait]
225+ impl < S : LogStore > Job for FlushJob < S > {
226+ // TODO(yingwen): [flush] Support in-job parallelism (Flush memtables concurrently)
227+ async fn run ( & mut self , ctx : & Context ) -> Result < ( ) > {
228+ let file_metas = self . write_memtables_to_layer ( ctx) . await ?;
229+
219230 let edit = RegionEdit {
220231 region_version : self . shared . version_control . metadata ( ) . version ( ) ,
221232 flushed_sequence : Some ( self . flush_sequence ) ,
@@ -232,21 +243,7 @@ impl<S: LogStore> FlushJob<S> {
232243 Some ( self . max_memtable_id ) ,
233244 )
234245 . await ?;
235- self . wal . obsolete ( self . flush_sequence ) . await
236- }
237-
238- /// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$`
239- fn generate_sst_file_name ( ) -> String {
240- format ! ( "{}.parquet" , Uuid :: new_v4( ) . hyphenated( ) )
241- }
242- }
243-
244- #[ async_trait]
245- impl < S : LogStore > Job for FlushJob < S > {
246- // TODO(yingwen): [flush] Support in-job parallelism (Flush memtables concurrently)
247- async fn run ( & mut self , ctx : & Context ) -> Result < ( ) > {
248- let file_metas = self . write_memtables_to_layer ( ctx) . await ?;
249- Self :: write_manifest_and_apply ( self , & file_metas) . await ?;
246+ self . wal . obsolete ( self . flush_sequence ) . await ?;
250247
251248 if let Some ( cb) = self . on_success . lock ( ) . await . take ( ) {
252249 cb. await ;
0 commit comments