Skip to content

Commit 3d754cb

Browse files
committed
refactor: TableScan's plan_files method now operates on manifest files and manifest data files concurrently rather than sequentially
1 parent 070576b commit 3d754cb

File tree

3 files changed

+213
-114
lines changed

3 files changed

+213
-114
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub struct ArrowReaderBuilder {
5252
field_ids: Vec<usize>,
5353
file_io: FileIO,
5454
schema: SchemaRef,
55-
predicates: Option<BoundPredicate>,
55+
predicate: Option<BoundPredicate>,
5656
}
5757

5858
impl ArrowReaderBuilder {
@@ -63,7 +63,7 @@ impl ArrowReaderBuilder {
6363
field_ids: vec![],
6464
file_io,
6565
schema,
66-
predicates: None,
66+
predicate: None,
6767
}
6868
}
6969

@@ -82,7 +82,7 @@ impl ArrowReaderBuilder {
8282

8383
/// Sets the predicates to apply to the scan.
8484
pub fn with_predicates(mut self, predicates: BoundPredicate) -> Self {
85-
self.predicates = Some(predicates);
85+
self.predicate = Some(predicates);
8686
self
8787
}
8888

@@ -93,7 +93,7 @@ impl ArrowReaderBuilder {
9393
field_ids: self.field_ids,
9494
schema: self.schema,
9595
file_io: self.file_io,
96-
predicates: self.predicates,
96+
predicate: self.predicate,
9797
}
9898
}
9999
}
@@ -105,7 +105,7 @@ pub struct ArrowReader {
105105
#[allow(dead_code)]
106106
schema: SchemaRef,
107107
file_io: FileIO,
108-
predicates: Option<BoundPredicate>,
108+
predicate: Option<BoundPredicate>,
109109
}
110110

111111
impl ArrowReader {
@@ -118,7 +118,7 @@ impl ArrowReader {
118118
let mut collector = CollectFieldIdVisitor {
119119
field_ids: HashSet::default(),
120120
};
121-
if let Some(predicates) = &self.predicates {
121+
if let Some(predicates) = &self.predicate {
122122
visit(&mut collector, predicates)?;
123123
}
124124

@@ -162,7 +162,7 @@ impl ArrowReader {
162162
&self,
163163
parquet_schema: &SchemaDescriptor,
164164
arrow_schema: &ArrowSchemaRef,
165-
) -> crate::Result<ProjectionMask> {
165+
) -> Result<ProjectionMask> {
166166
if self.field_ids.is_empty() {
167167
Ok(ProjectionMask::all())
168168
} else {
@@ -232,7 +232,7 @@ impl ArrowReader {
232232
parquet_schema: &SchemaDescriptor,
233233
collector: &CollectFieldIdVisitor,
234234
) -> Result<Option<RowFilter>> {
235-
if let Some(predicates) = &self.predicates {
235+
if let Some(predicate) = &self.predicate {
236236
let field_id_map = build_field_id_map(parquet_schema)?;
237237

238238
// Collect Parquet column indices from field ids.
@@ -255,7 +255,7 @@ impl ArrowReader {
255255
// After collecting required leaf column indices used in the predicate,
256256
// creates the projection mask for the Arrow predicates.
257257
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
258-
let predicate_func = visit(&mut converter, predicates)?;
258+
let predicate_func = visit(&mut converter, predicate)?;
259259
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
260260
Ok(Some(RowFilter::new(vec![Box::new(arrow_predicate)])))
261261
} else {

crates/iceberg/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ define_from_err!(
333333

334334
define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
335335

336+
define_from_err!(
337+
futures::channel::mpsc::SendError,
338+
ErrorKind::Unexpected,
339+
"Failed to send a message to a channel"
340+
);
341+
336342
/// Helper macro to check arguments.
337343
///
338344
///

0 commit comments

Comments
 (0)