23
23
import org .finos .tracdap .common .data .ArrowVsrContext ;
24
24
import org .finos .tracdap .common .data .DataPipeline ;
25
25
import org .finos .tracdap .common .exception .EUnexpected ;
26
+ import org .slf4j .Logger ;
27
+ import org .slf4j .LoggerFactory ;
26
28
27
29
import java .util .ArrayList ;
28
30
@@ -35,6 +37,7 @@ public class RangeSelector
35
37
DataPipeline .DataConsumer <DataPipeline .ArrowApi >,
36
38
DataPipeline .DataProducer <DataPipeline .ArrowApi > {
37
39
40
+ private static final Logger log = LoggerFactory .getLogger (RangeSelector .class );
38
41
private final long offset ;
39
42
private final long limit ;
40
43
private long currentRow ;
@@ -123,24 +126,25 @@ public void onBatch() {
123
126
124
127
sliceRoot .setRowCount (batchSize );
125
128
sliceRoot .setLoaded ();
126
- consumer ().onBatch ();
127
129
}
128
130
else if (batchEndRow >= offset && (batchStartRow < offset + limit || limit == 0 )) {
129
131
130
- var sliceStart = (int ) (offset - batchStartRow );
132
+ var sliceStart = (int ) Math . max (offset - batchStartRow , 0 );
131
133
var sliceEnd = (int ) Math .min (offset + limit - batchStartRow , batchSize );
132
134
var sliceLength = sliceEnd - sliceStart ;
133
135
134
136
sliceTransfers .forEach (slice -> slice .splitAndTransfer (sliceStart , sliceLength ));
135
137
136
138
sliceRoot .setRowCount (sliceLength );
137
139
sliceRoot .setLoaded ();
138
- consumer ().onBatch ();
139
140
}
140
141
141
142
// Always consume the incoming data
142
143
incomingRoot .setUnloaded ();
143
144
currentRow += batchSize ;
145
+
146
+ if (sliceRoot .readyToUnload ())
147
+ consumer ().onBatch ();
144
148
}
145
149
146
150
@ Override
0 commit comments