@@ -18,7 +18,7 @@ use async_trait::async_trait;
1818use snafu:: ensure;
1919
2020use crate :: inverted_index:: create:: sort:: { SortOutput , Sorter } ;
21- use crate :: inverted_index:: create:: IndexCreator ;
21+ use crate :: inverted_index:: create:: InvertedIndexCreator ;
2222use crate :: inverted_index:: error:: { InconsistentRowCountSnafu , Result } ;
2323use crate :: inverted_index:: format:: writer:: InvertedIndexWriter ;
2424use crate :: inverted_index:: Bytes ;
@@ -38,15 +38,12 @@ pub struct SortIndexCreator {
3838 /// Map of index names to sorters
3939 sorters : HashMap < IndexName , Box < dyn Sorter > > ,
4040
41- /// Writer for inverted index data
42- index_writer : Box < dyn InvertedIndexWriter > ,
43-
4441 /// Number of rows in each segment, used to produce sorters
4542 segment_row_count : usize ,
4643}
4744
4845#[ async_trait]
49- impl IndexCreator for SortIndexCreator {
46+ impl InvertedIndexCreator for SortIndexCreator {
5047 /// Inserts a value or null into the sorter for the specified index
5148 async fn push_with_name ( & mut self , index_name : & str , value : Option < Bytes > ) -> Result < ( ) > {
5249 match self . sorters . get_mut ( index_name) {
@@ -62,7 +59,7 @@ impl IndexCreator for SortIndexCreator {
6259 }
6360
6461 /// Finalizes the sorting for all indexes and writes them using the inverted index writer
65- async fn finish ( & mut self ) -> Result < ( ) > {
62+ async fn finish ( & mut self , writer : & mut dyn InvertedIndexWriter ) -> Result < ( ) > {
6663 let mut row_count = None ;
6764
6865 for ( index_name, mut sorter) in self . sorters . drain ( ) {
@@ -82,12 +79,12 @@ impl IndexCreator for SortIndexCreator {
8279 }
8380 ) ;
8481
85- self . index_writer
82+ writer
8683 . add_index ( index_name, null_bitmap, sorted_stream)
8784 . await ?;
8885 }
8986
90- self . index_writer
87+ writer
9188 . finish (
9289 row_count. unwrap_or_default ( ) as _ ,
9390 self . segment_row_count as _ ,
@@ -98,15 +95,10 @@ impl IndexCreator for SortIndexCreator {
9895
9996impl SortIndexCreator {
10097 /// Creates a new `SortIndexCreator` with the given sorter factory and index writer
101- pub fn new (
102- sorter_factory : SorterFactory ,
103- index_writer : Box < dyn InvertedIndexWriter > ,
104- segment_row_count : usize ,
105- ) -> Self {
98+ pub fn new ( sorter_factory : SorterFactory , segment_row_count : usize ) -> Self {
10699 Self {
107100 sorter_factory,
108101 sorters : HashMap :: new ( ) ,
109- index_writer,
110102 segment_row_count,
111103 }
112104 }
@@ -132,6 +124,23 @@ mod tests {
132124
133125 #[ tokio:: test]
134126 async fn test_sort_index_creator_basic ( ) {
127+ let mut creator = SortIndexCreator :: new ( NaiveSorter :: factory ( ) , 1 ) ;
128+
129+ let index_values = vec ! [
130+ ( "a" , vec![ b"3" , b"2" , b"1" ] ) ,
131+ ( "b" , vec![ b"6" , b"5" , b"4" ] ) ,
132+ ( "c" , vec![ b"1" , b"2" , b"3" ] ) ,
133+ ] ;
134+
135+ for ( index_name, values) in index_values {
136+ for value in values {
137+ creator
138+ . push_with_name ( index_name, Some ( value. into ( ) ) )
139+ . await
140+ . unwrap ( ) ;
141+ }
142+ }
143+
135144 let mut mock_writer = MockInvertedIndexWriter :: new ( ) ;
136145 mock_writer
137146 . expect_add_index ( )
@@ -155,12 +164,17 @@ mod tests {
155164 Ok ( ( ) )
156165 } ) ;
157166
158- let mut creator = SortIndexCreator :: new ( NaiveSorter :: factory ( ) , Box :: new ( mock_writer) , 1 ) ;
167+ creator. finish ( & mut mock_writer) . await . unwrap ( ) ;
168+ }
169+
170+ #[ tokio:: test]
171+ async fn test_sort_index_creator_inconsistent_row_count ( ) {
172+ let mut creator = SortIndexCreator :: new ( NaiveSorter :: factory ( ) , 1 ) ;
159173
160174 let index_values = vec ! [
161175 ( "a" , vec![ b"3" , b"2" , b"1" ] ) ,
162176 ( "b" , vec![ b"6" , b"5" , b"4" ] ) ,
163- ( "c" , vec![ b"1" , b"2" , b"3" ] ) ,
177+ ( "c" , vec![ b"1" , b"2" ] ) ,
164178 ] ;
165179
166180 for ( index_name, values) in index_values {
@@ -172,11 +186,6 @@ mod tests {
172186 }
173187 }
174188
175- creator. finish ( ) . await . unwrap ( ) ;
176- }
177-
178- #[ tokio:: test]
179- async fn test_sort_index_creator_inconsistant_row_count ( ) {
180189 let mut mock_writer = MockInvertedIndexWriter :: new ( ) ;
181190 mock_writer
182191 . expect_add_index ( )
@@ -192,24 +201,7 @@ mod tests {
192201 } ) ;
193202 mock_writer. expect_finish ( ) . never ( ) ;
194203
195- let mut creator = SortIndexCreator :: new ( NaiveSorter :: factory ( ) , Box :: new ( mock_writer) , 1 ) ;
196-
197- let index_values = vec ! [
198- ( "a" , vec![ b"3" , b"2" , b"1" ] ) ,
199- ( "b" , vec![ b"6" , b"5" , b"4" ] ) ,
200- ( "c" , vec![ b"1" , b"2" ] ) ,
201- ] ;
202-
203- for ( index_name, values) in index_values {
204- for value in values {
205- creator
206- . push_with_name ( index_name, Some ( value. into ( ) ) )
207- . await
208- . unwrap ( ) ;
209- }
210- }
211-
212- let res = creator. finish ( ) . await ;
204+ let res = creator. finish ( & mut mock_writer) . await ;
213205 assert ! ( matches!( res, Err ( Error :: InconsistentRowCount { .. } ) ) ) ;
214206 }
215207
0 commit comments