@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
21
21
use arrow_array:: Array ;
22
22
use arrow_array:: { RecordBatch , RecordBatchReader } ;
23
23
use arrow_schema:: { ArrowError , DataType as ArrowType , Schema , SchemaRef } ;
24
- use arrow_select:: filter:: prep_null_mask_filter;
25
24
pub use filter:: { ArrowPredicate , ArrowPredicateFn , RowFilter } ;
26
25
pub use selection:: { RowSelection , RowSelector } ;
27
- use std:: collections:: VecDeque ;
28
26
use std:: sync:: Arc ;
29
27
30
28
pub use crate :: arrow:: array_reader:: RowGroups ;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39
37
use crate :: file:: reader:: { ChunkReader , SerializedPageReader } ;
40
38
use crate :: schema:: types:: SchemaDescriptor ;
41
39
40
+ use read_plan:: { ReadPlan , ReadPlanBuilder } ;
41
+
42
42
mod filter;
43
+ pub ( crate ) mod read_plan;
43
44
mod selection;
44
45
pub mod statistics;
45
46
@@ -679,38 +680,33 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
679
680
} ;
680
681
681
682
let mut filter = self . filter ;
682
- let mut selection = self . selection ;
683
+ let mut plan_builder = ReadPlanBuilder :: new ( batch_size ) . with_selection ( self . selection ) ;
683
684
685
+ // Update selection based on any filters
684
686
if let Some ( filter) = filter. as_mut ( ) {
685
687
for predicate in filter. predicates . iter_mut ( ) {
686
- if !selects_any ( selection. as_ref ( ) ) {
688
+ // break early if we have already ruled out all rows
689
+ if !plan_builder. selects_any ( ) {
687
690
break ;
688
691
}
689
692
693
+ // TODO move this into the read_plan
690
694
let array_reader =
691
695
build_array_reader ( self . fields . as_deref ( ) , predicate. projection ( ) , & reader) ?;
692
696
693
- selection = Some ( evaluate_predicate (
694
- batch_size,
695
- array_reader,
696
- selection,
697
- predicate. as_mut ( ) ,
698
- ) ?) ;
697
+ plan_builder = plan_builder. with_predicate ( array_reader, predicate. as_mut ( ) ) ?;
699
698
}
700
699
}
701
700
702
701
let array_reader = build_array_reader ( self . fields . as_deref ( ) , & self . projection , & reader) ?;
702
+ let read_plan = plan_builder
703
+ . limited ( reader. num_rows ( ) )
704
+ . with_offset ( self . offset )
705
+ . with_limit ( self . limit )
706
+ . build_limited ( )
707
+ . build ( ) ;
703
708
704
- // If selection is empty, truncate
705
- if !selects_any ( selection. as_ref ( ) ) {
706
- selection = Some ( RowSelection :: from ( vec ! [ ] ) ) ;
707
- }
708
-
709
- Ok ( ParquetRecordBatchReader :: new (
710
- batch_size,
711
- array_reader,
712
- apply_range ( selection, reader. num_rows ( ) , self . offset , self . limit ) ,
713
- ) )
709
+ Ok ( ParquetRecordBatchReader :: new ( array_reader, read_plan) )
714
710
}
715
711
}
716
712
@@ -789,11 +785,9 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
789
785
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
790
786
/// read from a parquet data source
791
787
pub struct ParquetRecordBatchReader {
792
- batch_size : usize ,
793
788
array_reader : Box < dyn ArrayReader > ,
794
789
schema : SchemaRef ,
795
- /// Row ranges to be selected from the data source
796
- selection : Option < VecDeque < RowSelector > > ,
790
+ read_plan : ReadPlan ,
797
791
}
798
792
799
793
impl Iterator for ParquetRecordBatchReader {
@@ -814,9 +808,10 @@ impl ParquetRecordBatchReader {
814
808
/// simplify error handling with `?`
815
809
fn next_inner ( & mut self ) -> Result < Option < RecordBatch > > {
816
810
let mut read_records = 0 ;
817
- match self . selection . as_mut ( ) {
811
+ let batch_size = self . batch_size ( ) ;
812
+ match self . read_plan . selection_mut ( ) {
818
813
Some ( selection) => {
819
- while read_records < self . batch_size && !selection. is_empty ( ) {
814
+ while read_records < batch_size && !selection. is_empty ( ) {
820
815
let front = selection. pop_front ( ) . unwrap ( ) ;
821
816
if front. skip {
822
817
let skipped = self . array_reader . skip_records ( front. row_count ) ?;
@@ -838,7 +833,7 @@ impl ParquetRecordBatchReader {
838
833
}
839
834
840
835
// try to read record
841
- let need_read = self . batch_size - read_records;
836
+ let need_read = batch_size - read_records;
842
837
let to_read = match front. row_count . checked_sub ( need_read) {
843
838
Some ( remaining) if remaining != 0 => {
844
839
// if page row count less than batch_size we must set batch size to page row count.
@@ -855,7 +850,7 @@ impl ParquetRecordBatchReader {
855
850
}
856
851
}
857
852
None => {
858
- self . array_reader . read_records ( self . batch_size ) ?;
853
+ self . array_reader . read_records ( self . batch_size ( ) ) ?;
859
854
}
860
855
} ;
861
856
@@ -905,116 +900,37 @@ impl ParquetRecordBatchReader {
905
900
let array_reader =
906
901
build_array_reader ( levels. levels . as_ref ( ) , & ProjectionMask :: all ( ) , row_groups) ?;
907
902
903
+ let read_plan = ReadPlanBuilder :: new ( batch_size)
904
+ . with_selection ( selection)
905
+ . build ( ) ;
906
+
908
907
Ok ( Self {
909
- batch_size,
910
908
array_reader,
911
909
schema : Arc :: new ( Schema :: new ( levels. fields . clone ( ) ) ) ,
912
- selection : selection . map ( |s| s . trim ( ) . into ( ) ) ,
910
+ read_plan ,
913
911
} )
914
912
}
915
913
916
914
/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
917
915
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
918
916
/// all rows will be returned
919
- pub ( crate ) fn new (
920
- batch_size : usize ,
921
- array_reader : Box < dyn ArrayReader > ,
922
- selection : Option < RowSelection > ,
923
- ) -> Self {
917
+ pub ( crate ) fn new ( array_reader : Box < dyn ArrayReader > , read_plan : ReadPlan ) -> Self {
924
918
let schema = match array_reader. get_data_type ( ) {
925
919
ArrowType :: Struct ( ref fields) => Schema :: new ( fields. clone ( ) ) ,
926
920
_ => unreachable ! ( "Struct array reader's data type is not struct!" ) ,
927
921
} ;
928
922
929
923
Self {
930
- batch_size,
931
924
array_reader,
932
925
schema : Arc :: new ( schema) ,
933
- selection : selection . map ( |s| s . trim ( ) . into ( ) ) ,
926
+ read_plan ,
934
927
}
935
928
}
936
- }
937
929
938
- /// Returns `true` if `selection` is `None` or selects some rows
939
- pub ( crate ) fn selects_any ( selection : Option < & RowSelection > ) -> bool {
940
- selection. map ( |x| x. selects_any ( ) ) . unwrap_or ( true )
941
- }
942
-
943
- /// Applies an optional offset and limit to an optional [`RowSelection`]
944
- pub ( crate ) fn apply_range (
945
- mut selection : Option < RowSelection > ,
946
- row_count : usize ,
947
- offset : Option < usize > ,
948
- limit : Option < usize > ,
949
- ) -> Option < RowSelection > {
950
- // If an offset is defined, apply it to the `selection`
951
- if let Some ( offset) = offset {
952
- selection = Some ( match row_count. checked_sub ( offset) {
953
- None => RowSelection :: from ( vec ! [ ] ) ,
954
- Some ( remaining) => selection
955
- . map ( |selection| selection. offset ( offset) )
956
- . unwrap_or_else ( || {
957
- RowSelection :: from ( vec ! [
958
- RowSelector :: skip( offset) ,
959
- RowSelector :: select( remaining) ,
960
- ] )
961
- } ) ,
962
- } ) ;
963
- }
964
-
965
- // If a limit is defined, apply it to the final `selection`
966
- if let Some ( limit) = limit {
967
- selection = Some (
968
- selection
969
- . map ( |selection| selection. limit ( limit) )
970
- . unwrap_or_else ( || {
971
- RowSelection :: from ( vec ! [ RowSelector :: select( limit. min( row_count) ) ] )
972
- } ) ,
973
- ) ;
974
- }
975
- selection
976
- }
977
-
978
- /// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
979
- /// which rows to return.
980
- ///
981
- /// `input_selection`: Optional pre-existing selection. If `Some`, then the
982
- /// final [`RowSelection`] will be the conjunction of it and the rows selected
983
- /// by `predicate`.
984
- ///
985
- /// Note: A pre-existing selection may come from evaluating a previous predicate
986
- /// or if the [`ParquetRecordBatchReader`] specified an explicit
987
- /// [`RowSelection`] in addition to one or more predicates.
988
- pub ( crate ) fn evaluate_predicate (
989
- batch_size : usize ,
990
- array_reader : Box < dyn ArrayReader > ,
991
- input_selection : Option < RowSelection > ,
992
- predicate : & mut dyn ArrowPredicate ,
993
- ) -> Result < RowSelection > {
994
- let reader = ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) ) ;
995
- let mut filters = vec ! [ ] ;
996
- for maybe_batch in reader {
997
- let maybe_batch = maybe_batch?;
998
- let input_rows = maybe_batch. num_rows ( ) ;
999
- let filter = predicate. evaluate ( maybe_batch) ?;
1000
- // Since user supplied predicate, check error here to catch bugs quickly
1001
- if filter. len ( ) != input_rows {
1002
- return Err ( arrow_err ! (
1003
- "ArrowPredicate predicate returned {} rows, expected {input_rows}" ,
1004
- filter. len( )
1005
- ) ) ;
1006
- }
1007
- match filter. null_count ( ) {
1008
- 0 => filters. push ( filter) ,
1009
- _ => filters. push ( prep_null_mask_filter ( & filter) ) ,
1010
- } ;
930
+ #[ inline( always) ]
931
+ pub ( crate ) fn batch_size ( & self ) -> usize {
932
+ self . read_plan . batch_size ( )
1011
933
}
1012
-
1013
- let raw = RowSelection :: from_filters ( & filters) ;
1014
- Ok ( match input_selection {
1015
- Some ( selection) => selection. and_then ( & raw ) ,
1016
- None => raw,
1017
- } )
1018
934
}
1019
935
1020
936
#[ cfg( test) ]
@@ -3993,7 +3909,7 @@ mod tests {
3993
3909
. build ( )
3994
3910
. unwrap ( ) ;
3995
3911
assert_ne ! ( 1024 , num_rows) ;
3996
- assert_eq ! ( reader. batch_size, num_rows as usize ) ;
3912
+ assert_eq ! ( reader. read_plan . batch_size( ) , num_rows as usize ) ;
3997
3913
}
3998
3914
3999
3915
#[ test]
0 commit comments