@@ -136,7 +136,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
136136 schema_name = self .schema .name , table_name = table
137137 )
138138 truncate_prefixes .add (posixpath .join (self .dataset_path , table_prefix ))
139- # print(f"TRUNCATE PREFIXES {truncate_prefixes}")
139+ # print(f"TRUNCATE PREFIXES {truncate_prefixes} on {truncate_tables} ")
140140
141141 for truncate_dir in truncated_dirs :
142142 # get files in truncate dirs
@@ -145,16 +145,22 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
145145 logger .info (f"Will truncate tables in { truncate_dir } " )
146146 try :
147147 all_files = self .fs_client .ls (truncate_dir , detail = False , refresh = True )
148- logger .info (f"Found { len (all_files )} CANDIDATE files in { truncate_dir } " )
149- print (f"in truncate dir { truncate_dir } : { all_files } " )
148+ # logger.debug (f"Found {len(all_files)} CANDIDATE files in {truncate_dir}")
149+ # print(f"in truncate dir {truncate_dir}: {all_files}")
150150 for item in all_files :
151151 # check every file against all the prefixes
152152 for search_prefix in truncate_prefixes :
153153 if item .startswith (search_prefix ):
154154 # NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
155- # logger.info(f"DEL {item}")
156- print (f"DEL { item } " )
157- self .fs_client .rm (item )
155+ # print(f"DEL {item}")
156+ try :
157+ # NOTE: must use rm_file to get errors on delete
158+ self .fs_client .rm_file (item )
159+ except NotImplementedError :
160+ # not all filesystem implement the above
161+ self .fs_client .rm (item )
162+ if self .fs_client .exists (item ):
163+ raise FileExistsError (item )
158164 except FileNotFoundError :
159165 logger .info (
160166 f"Directory or path to truncate tables { truncate_dir } does not exist but it"
0 commit comments