18
18
//! Defines common code used in execution plans
19
19
20
20
use std:: fs;
21
- use std:: fs:: { metadata, File } ;
22
- use std:: path:: { Path , PathBuf } ;
21
+ use std:: fs:: metadata;
23
22
use std:: sync:: Arc ;
24
23
25
24
use super :: SendableRecordBatchStream ;
@@ -28,10 +27,9 @@ use crate::{ColumnStatistics, Statistics};
28
27
29
28
use arrow:: array:: Array ;
30
29
use arrow:: datatypes:: Schema ;
31
- use arrow:: ipc:: writer:: { FileWriter , IpcWriteOptions } ;
32
30
use arrow:: record_batch:: RecordBatch ;
33
31
use datafusion_common:: stats:: Precision ;
34
- use datafusion_common:: { plan_err, DataFusionError , Result } ;
32
+ use datafusion_common:: { plan_err, Result } ;
35
33
use datafusion_execution:: memory_pool:: MemoryReservation ;
36
34
37
35
use futures:: { StreamExt , TryStreamExt } ;
@@ -180,77 +178,6 @@ pub fn compute_record_batch_statistics(
180
178
}
181
179
}
182
180
183
- /// Write in Arrow IPC File format.
184
- pub struct IPCWriter {
185
- /// Path
186
- pub path : PathBuf ,
187
- /// Inner writer
188
- pub writer : FileWriter < File > ,
189
- /// Batches written
190
- pub num_batches : usize ,
191
- /// Rows written
192
- pub num_rows : usize ,
193
- /// Bytes written
194
- pub num_bytes : usize ,
195
- }
196
-
197
- impl IPCWriter {
198
- /// Create new writer
199
- pub fn new ( path : & Path , schema : & Schema ) -> Result < Self > {
200
- let file = File :: create ( path) . map_err ( |e| {
201
- DataFusionError :: Execution ( format ! (
202
- "Failed to create partition file at {path:?}: {e:?}"
203
- ) )
204
- } ) ?;
205
- Ok ( Self {
206
- num_batches : 0 ,
207
- num_rows : 0 ,
208
- num_bytes : 0 ,
209
- path : path. into ( ) ,
210
- writer : FileWriter :: try_new ( file, schema) ?,
211
- } )
212
- }
213
-
214
- /// Create new writer with IPC write options
215
- pub fn new_with_options (
216
- path : & Path ,
217
- schema : & Schema ,
218
- write_options : IpcWriteOptions ,
219
- ) -> Result < Self > {
220
- let file = File :: create ( path) . map_err ( |e| {
221
- DataFusionError :: Execution ( format ! (
222
- "Failed to create partition file at {path:?}: {e:?}"
223
- ) )
224
- } ) ?;
225
- Ok ( Self {
226
- num_batches : 0 ,
227
- num_rows : 0 ,
228
- num_bytes : 0 ,
229
- path : path. into ( ) ,
230
- writer : FileWriter :: try_new_with_options ( file, schema, write_options) ?,
231
- } )
232
- }
233
- /// Write one single batch
234
- pub fn write ( & mut self , batch : & RecordBatch ) -> Result < ( ) > {
235
- self . writer . write ( batch) ?;
236
- self . num_batches += 1 ;
237
- self . num_rows += batch. num_rows ( ) ;
238
- let num_bytes: usize = batch. get_array_memory_size ( ) ;
239
- self . num_bytes += num_bytes;
240
- Ok ( ( ) )
241
- }
242
-
243
- /// Finish the writer
244
- pub fn finish ( & mut self ) -> Result < ( ) > {
245
- self . writer . finish ( ) . map_err ( Into :: into)
246
- }
247
-
248
- /// Path write to
249
- pub fn path ( & self ) -> & Path {
250
- & self . path
251
- }
252
- }
253
-
254
181
/// Checks if the given projection is valid for the given schema.
255
182
pub fn can_project (
256
183
schema : & arrow:: datatypes:: SchemaRef ,
0 commit comments