Skip to content

Commit 7670b07

Browse files
committed
refactor: TableScan's plan_files method now operates on manifest files and manifest data files concurrently rather than sequentially
1 parent 1aa05e0 commit 7670b07

File tree

3 files changed

+194
-85
lines changed

3 files changed

+194
-85
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub struct ArrowReaderBuilder {
5252
field_ids: Vec<usize>,
5353
file_io: FileIO,
5454
schema: SchemaRef,
55-
predicates: Option<BoundPredicate>,
55+
predicate: Option<BoundPredicate>,
5656
}
5757

5858
impl ArrowReaderBuilder {
@@ -63,7 +63,7 @@ impl ArrowReaderBuilder {
6363
field_ids: vec![],
6464
file_io,
6565
schema,
66-
predicates: None,
66+
predicate: None,
6767
}
6868
}
6969

@@ -82,7 +82,7 @@ impl ArrowReaderBuilder {
8282

8383
/// Sets the predicates to apply to the scan.
8484
pub fn with_predicates(mut self, predicates: BoundPredicate) -> Self {
85-
self.predicates = Some(predicates);
85+
self.predicate = Some(predicates);
8686
self
8787
}
8888

@@ -93,7 +93,7 @@ impl ArrowReaderBuilder {
9393
field_ids: self.field_ids,
9494
schema: self.schema,
9595
file_io: self.file_io,
96-
predicates: self.predicates,
96+
predicate: self.predicate,
9797
}
9898
}
9999
}
@@ -105,7 +105,7 @@ pub struct ArrowReader {
105105
#[allow(dead_code)]
106106
schema: SchemaRef,
107107
file_io: FileIO,
108-
predicates: Option<BoundPredicate>,
108+
predicate: Option<BoundPredicate>,
109109
}
110110

111111
impl ArrowReader {
@@ -118,7 +118,7 @@ impl ArrowReader {
118118
let mut collector = CollectFieldIdVisitor {
119119
field_ids: HashSet::default(),
120120
};
121-
if let Some(predicates) = &self.predicates {
121+
if let Some(predicates) = &self.predicate {
122122
visit(&mut collector, predicates)?;
123123
}
124124

@@ -162,7 +162,7 @@ impl ArrowReader {
162162
&self,
163163
parquet_schema: &SchemaDescriptor,
164164
arrow_schema: &ArrowSchemaRef,
165-
) -> crate::Result<ProjectionMask> {
165+
) -> Result<ProjectionMask> {
166166
if self.field_ids.is_empty() {
167167
Ok(ProjectionMask::all())
168168
} else {
@@ -232,7 +232,7 @@ impl ArrowReader {
232232
parquet_schema: &SchemaDescriptor,
233233
collector: &CollectFieldIdVisitor,
234234
) -> Result<Option<RowFilter>> {
235-
if let Some(predicates) = &self.predicates {
235+
if let Some(predicate) = &self.predicate {
236236
let field_id_map = build_field_id_map(parquet_schema)?;
237237

238238
// Collect Parquet column indices from field ids.
@@ -255,7 +255,7 @@ impl ArrowReader {
255255
// After collecting required leaf column indices used in the predicate,
256256
// creates the projection mask for the Arrow predicates.
257257
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
258-
let predicate_func = visit(&mut converter, predicates)?;
258+
let predicate_func = visit(&mut converter, predicate)?;
259259
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
260260
Ok(Some(RowFilter::new(vec![Box::new(arrow_predicate)])))
261261
} else {

crates/iceberg/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ define_from_err!(
333333

334334
define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
335335

336+
define_from_err!(
337+
futures::channel::mpsc::SendError,
338+
ErrorKind::Unexpected,
339+
"Failed to send a message to a channel"
340+
);
341+
336342
/// Helper macro to check arguments.
337343
///
338344
///

crates/iceberg/src/scan.rs

Lines changed: 179 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,24 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
2424
use crate::expr::{Bind, BoundPredicate, Predicate};
2525
use crate::io::FileIO;
2626
use crate::spec::{
27-
DataContentType, ManifestContentType, ManifestFile, Schema, SchemaRef, SnapshotRef,
28-
TableMetadataRef,
27+
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema,
28+
SchemaRef, SnapshotRef, TableMetadataRef,
2929
};
3030
use crate::table::Table;
3131
use crate::{Error, ErrorKind, Result};
3232
use arrow_array::RecordBatch;
33-
use async_stream::try_stream;
33+
use futures::channel::mpsc::{channel, Sender};
3434
use futures::stream::BoxStream;
35-
use futures::StreamExt;
35+
use futures::{SinkExt, StreamExt, TryStreamExt};
3636
use serde::{Deserialize, Serialize};
3737
use std::collections::hash_map::Entry;
3838
use std::collections::HashMap;
3939
use std::sync::Arc;
40+
use tokio::spawn;
41+
42+
const CHANNEL_BUFFER_SIZE: usize = 10;
43+
const CONCURRENCY_LIMIT_MANIFEST_FILES: usize = 10;
44+
const SUPPORTED_MANIFEST_FILE_CONTENT_TYPES: [ManifestContentType; 1] = [ManifestContentType::Data];
4045

4146
/// A stream of [`FileScanTask`].
4247
pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
@@ -207,76 +212,20 @@ impl TableScan {
207212
self.case_sensitive,
208213
)?;
209214

210-
let mut partition_filter_cache = PartitionFilterCache::new();
211-
let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
212-
213-
Ok(try_stream! {
214-
let manifest_list = context
215-
.snapshot
216-
.load_manifest_list(&context.file_io, &context.table_metadata)
217-
.await?;
218-
219-
for entry in manifest_list.entries() {
220-
if !Self::content_type_is_data(entry) {
221-
continue;
222-
}
223-
224-
let partition_spec_id = entry.partition_spec_id;
225-
226-
let partition_filter = partition_filter_cache.get(
227-
partition_spec_id,
228-
&context,
229-
)?;
215+
let manifest_list = context
216+
.snapshot
217+
.load_manifest_list(&context.file_io, &context.table_metadata)
218+
.await?;
230219

231-
if let Some(partition_filter) = partition_filter {
232-
let manifest_evaluator = manifest_evaluator_cache.get(
233-
partition_spec_id,
234-
partition_filter,
235-
);
220+
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
236221

237-
if !manifest_evaluator.eval(entry)? {
238-
continue;
239-
}
240-
}
222+
spawn(async move {
223+
let _ = ConcurrentFileScanStreamContext::new(context, sender)
224+
.run(manifest_list)
225+
.await;
226+
});
241227

242-
let manifest = entry.load_manifest(&context.file_io).await?;
243-
let mut manifest_entries_stream =
244-
futures::stream::iter(manifest.entries().iter().filter(|e| e.is_alive()));
245-
246-
while let Some(manifest_entry) = manifest_entries_stream.next().await {
247-
// TODO: Apply ExpressionEvaluator
248-
249-
if let Some(bound_predicate) = context.bound_filter() {
250-
// reject any manifest entries whose data file's metrics don't match the filter.
251-
if !InclusiveMetricsEvaluator::eval(
252-
bound_predicate,
253-
manifest_entry.data_file(),
254-
false
255-
)? {
256-
continue;
257-
}
258-
}
259-
260-
match manifest_entry.content_type() {
261-
DataContentType::EqualityDeletes | DataContentType::PositionDeletes => {
262-
yield Err(Error::new(
263-
ErrorKind::FeatureUnsupported,
264-
"Delete files are not supported yet.",
265-
))?;
266-
}
267-
DataContentType::Data => {
268-
let scan_task: Result<FileScanTask> = Ok(FileScanTask {
269-
data_file_path: manifest_entry.data_file().file_path().to_string(),
270-
start: 0,
271-
length: manifest_entry.file_size_in_bytes(),
272-
});
273-
yield scan_task?;
274-
}
275-
}
276-
}
277-
}
278-
}
279-
.boxed())
228+
return Ok(receiver.boxed());
280229
}
281230

282231
/// Returns an [`ArrowRecordBatchStream`].
@@ -334,13 +283,167 @@ impl TableScan {
334283

335284
arrow_reader_builder.build().read(self.plan_files().await?)
336285
}
286+
}
287+
288+
#[derive(Debug)]
289+
struct ConcurrentFileScanStreamContext {
290+
context: Arc<FileScanStreamContext>,
291+
sender: Sender<Result<FileScanTask>>,
292+
manifest_evaluator_cache: ManifestEvaluatorCache,
293+
partition_filter_cache: PartitionFilterCache,
294+
}
295+
296+
impl ConcurrentFileScanStreamContext {
297+
fn new(context: FileScanStreamContext, sender: Sender<Result<FileScanTask>>) -> Self {
298+
ConcurrentFileScanStreamContext {
299+
context: Arc::new(context),
300+
sender,
301+
manifest_evaluator_cache: ManifestEvaluatorCache::new(),
302+
partition_filter_cache: PartitionFilterCache::new(),
303+
}
304+
}
305+
306+
async fn run(&mut self, manifest_list: ManifestList) -> Result<()> {
307+
let file_io = self.context.file_io.clone();
308+
let sender = self.sender.clone();
309+
let context = self.context.clone();
310+
311+
// This whole Vec-and-for-loop approach feels sub-optimally structured.
312+
// I've tried structuring this in multiple ways but run into
313+
// issues with ownership. Ideally I'd like to structure this
314+
// with a functional programming approach: extracting
315+
// sections 1, 2, and 3 out into different methods on Self,
316+
// and then use some iterator combinators to orchestrate it all.
317+
// Section 1 is pretty trivially refactorable into a static method
318+
// that can be used in a closure that can be used with Iterator::filter.
319+
// Similarly, section 3 seems easily factor-able into a method that can
320+
// be passed into Iterator::map.
321+
// Section 2 turns out trickier - we want to exit the entire `run` method early
322+
// if the eval fails, and filter out any manifest_files from the iterator / stream
323+
// if the eval succeeds but returns true. We bump into ownership issues due
324+
// to needing to pass mut self as the caches need to be able to mutate.
325+
// Section 3 runs into ownership issues when trying to refactor its closure to be
326+
// a static or non-static method.
327+
328+
// 1
329+
let filtered_manifest_files = manifest_list
330+
.entries()
331+
.iter()
332+
.filter(Self::reject_unsupported_content_types);
333+
334+
// 2
335+
let mut filtered_manifest_files2 = vec![];
336+
for manifest_file in filtered_manifest_files {
337+
if !self.apply_evaluator(manifest_file)? {
338+
continue;
339+
}
340+
341+
filtered_manifest_files2.push(manifest_file);
342+
}
343+
344+
// 3
345+
let filtered_manifest_files = filtered_manifest_files2.into_iter().map(|manifest_file| {
346+
Ok((
347+
manifest_file,
348+
file_io.clone(),
349+
sender.clone(),
350+
context.clone(),
351+
))
352+
});
353+
354+
futures::stream::iter(filtered_manifest_files)
355+
.try_for_each_concurrent(
356+
CONCURRENCY_LIMIT_MANIFEST_FILES,
357+
Self::process_manifest_file,
358+
)
359+
.await
360+
}
361+
362+
fn reject_unsupported_content_types(manifest_file: &&ManifestFile) -> bool {
363+
SUPPORTED_MANIFEST_FILE_CONTENT_TYPES.contains(&manifest_file.content)
364+
}
365+
366+
fn apply_evaluator(&mut self, manifest_file: &ManifestFile) -> Result<bool> {
367+
let partition_spec_id = manifest_file.partition_spec_id;
368+
369+
let partition_filter = self
370+
.partition_filter_cache
371+
.get(partition_spec_id, &self.context)?;
372+
373+
if let Some(partition_filter) = partition_filter {
374+
let manifest_evaluator = self
375+
.manifest_evaluator_cache
376+
.get(partition_spec_id, partition_filter);
377+
378+
if !manifest_evaluator.eval(manifest_file)? {
379+
return Ok(false);
380+
}
381+
}
382+
383+
Ok(true)
384+
}
385+
386+
async fn process_manifest_file(
387+
manifest_and_file_io_and_sender: (
388+
&ManifestFile,
389+
FileIO,
390+
Sender<Result<FileScanTask>>,
391+
Arc<FileScanStreamContext>,
392+
),
393+
) -> Result<()> {
394+
let (manifest_file, file_io, mut sender, context) = manifest_and_file_io_and_sender;
395+
396+
let manifest = manifest_file.load_manifest(&file_io).await?;
397+
for manifest_entry in manifest.entries() {
398+
if !manifest_entry.is_alive() {
399+
continue;
400+
}
401+
402+
Self::reject_unsupported_manifest_entry_content_types(manifest_entry)?;
403+
404+
if let Some(bound_predicate) = context.bound_filter() {
405+
// reject any manifest entries whose data file's metrics don't match the filter.
406+
if !InclusiveMetricsEvaluator::eval(
407+
bound_predicate,
408+
manifest_entry.data_file(),
409+
false,
410+
)? {
411+
return Ok(());
412+
}
413+
}
414+
415+
// TODO: Apply ExpressionEvaluator
416+
417+
sender
418+
.send(Ok(Self::manifest_entry_to_file_scan_task(manifest_entry)))
419+
.await?;
420+
}
421+
422+
Ok(())
423+
}
424+
425+
fn reject_unsupported_manifest_entry_content_types(
426+
manifest_entry: &ManifestEntryRef,
427+
) -> Result<()> {
428+
if matches!(manifest_entry.content_type(), DataContentType::Data) {
429+
Ok(())
430+
} else {
431+
Err(Error::new(
432+
ErrorKind::FeatureUnsupported,
433+
format!(
434+
"Files of type '{:?}' are not supported yet.",
435+
manifest_entry.content_type()
436+
),
437+
))
438+
}
439+
}
337440

338-
/// Checks whether the [`ManifestContentType`] is `Data` or not.
339-
fn content_type_is_data(entry: &ManifestFile) -> bool {
340-
if let ManifestContentType::Data = entry.content {
341-
return true;
441+
fn manifest_entry_to_file_scan_task(manifest_entry: &ManifestEntryRef) -> FileScanTask {
442+
FileScanTask {
443+
data_file_path: manifest_entry.file_path().to_string(),
444+
start: 0,
445+
length: manifest_entry.file_size_in_bytes(),
342446
}
343-
false
344447
}
345448
}
346449

0 commit comments

Comments
 (0)