@@ -30,7 +30,8 @@ use crate::metrics::{UPLOAD_BYTES_TOTAL, WRITE_AND_UPLOAD_ELAPSED_TOTAL};
3030use crate :: read:: Source ;
3131use crate :: sst:: file:: FileId ;
3232use crate :: sst:: parquet:: writer:: ParquetWriter ;
33- use crate :: sst:: parquet:: { SstInfo , WriteOptions , DEFAULT_WRITE_BUFFER_SIZE } ;
33+ use crate :: sst:: parquet:: { SstInfo , WriteOptions } ;
34+ use crate :: sst:: DEFAULT_WRITE_BUFFER_SIZE ;
3435
3536/// A cache for uploading files to remote object stores.
3637///
@@ -137,3 +138,94 @@ pub struct SstUploadRequest {
137138 /// Remote object store to upload.
138139 pub remote_store : ObjectStore ,
139140}
141+
142+ #[ cfg( test) ]
143+ mod tests {
144+ use api:: v1:: OpType ;
145+ use common_base:: readable_size:: ReadableSize ;
146+ use common_test_util:: temp_dir:: create_temp_dir;
147+ use object_store:: manager:: ObjectStoreManager ;
148+ use object_store:: services:: Fs ;
149+ use object_store:: ObjectStore ;
150+ use store_api:: storage:: RegionId ;
151+
152+ use super :: * ;
153+ use crate :: cache:: file_cache:: { self , FileCache } ;
154+ use crate :: cache:: test_util:: new_fs_store;
155+ use crate :: sst:: file:: FileId ;
156+ use crate :: sst:: location:: sst_file_path;
157+ use crate :: test_util:: sst_util:: {
158+ new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
159+ } ;
160+ use crate :: test_util:: { build_rows, new_batch_builder, CreateRequestBuilder , TestEnv } ;
161+
162+ #[ tokio:: test]
163+ async fn test_write_and_upload_sst ( ) {
164+ // Create a local object store and FileCache.
165+ let local_dir = create_temp_dir ( "" ) ;
166+ let local_store = new_fs_store ( local_dir. path ( ) . to_str ( ) . unwrap ( ) ) ;
167+ let cache = FileCache :: new ( local_store. clone ( ) , ReadableSize :: mb ( 10 ) ) ;
168+ let file_id = FileId :: random ( ) ;
169+
170+ // Create Source
171+ let metadata = Arc :: new ( sst_region_metadata ( ) ) ;
172+ let region_id = metadata. region_id ;
173+ let source = new_source ( & [
174+ new_batch_by_range ( & [ "a" , "d" ] , 0 , 60 ) ,
175+ new_batch_by_range ( & [ "b" , "f" ] , 0 , 40 ) ,
176+ new_batch_by_range ( & [ "b" , "h" ] , 100 , 200 ) ,
177+ ] ) ;
178+
179+ // TODO(QuenKar): maybe find a way to create some object server for testing,
180+ // and now just use local file system to mock.
181+ let mut env = TestEnv :: new ( ) ;
182+ let mock_store = env. init_object_store_manager ( ) ;
183+ let upload_path = sst_file_path ( "test" , file_id) ;
184+
185+ // Create WriteCache
186+ let object_store_manager = Arc :: new ( ObjectStoreManager :: new ( "mock" , mock_store. clone ( ) ) ) ;
187+ let write_cache = WriteCache :: new (
188+ local_store. clone ( ) ,
189+ object_store_manager,
190+ ReadableSize :: mb ( 10 ) ,
191+ ) ;
192+
193+ let request = SstUploadRequest {
194+ file_id,
195+ metadata,
196+ source,
197+ storage : Some ( "mock" . to_string ( ) ) ,
198+ upload_path : upload_path. clone ( ) ,
199+ remote_store : mock_store. clone ( ) ,
200+ } ;
201+
202+ let write_opts = WriteOptions {
203+ row_group_size : 512 ,
204+ ..Default :: default ( )
205+ } ;
206+
207+ // Write to cache and upload sst to mock remote store
208+ let sst_info = write_cache
209+ . write_and_upload_sst ( request, & write_opts)
210+ . await
211+ . unwrap ( )
212+ . unwrap ( ) ;
213+
214+ // Check write cache contains the key
215+ let key = ( region_id, file_id) ;
216+ assert ! ( write_cache. file_cache. contains_key( & key) ) ;
217+
218+ // Check file data
219+ let remote_data = mock_store. read ( & upload_path) . await . unwrap ( ) ;
220+ let cache_data = local_store. read ( & cache. cache_file_path ( key) ) . await . unwrap ( ) ;
221+ assert_eq ! ( remote_data. len( ) , 3434 ) ;
222+ assert_eq ! ( remote_data, cache_data) ;
223+
224+ // Delete test files
225+ mock_store
226+ . delete ( & upload_path)
227+ . await
228+ . expect ( "delete must succeed" ) ;
229+ write_cache. file_cache . remove ( key) ;
230+ }
231+ }
0 commit comments