Skip to content

Commit 7c986d0

Browse files
Fix / Data pagination background load (#619)
* Add multi batch tests to codec test suite * Make range selector synchronous and add a test case * Only fail data tests on memory issues inside the TRAC services * Remove double buffering
1 parent f66ff94 commit 7c986d0

File tree

18 files changed

+255
-206
lines changed

18 files changed

+255
-206
lines changed

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/StreamingDecoder.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,12 @@ public DataPipeline.StreamApi dataInterface() {
4343
@Override
4444
public boolean isReady() {
4545

46-
if (context != null)
47-
return context.readyToFlip() || context.readyToLoad();
48-
else
49-
return consumerReady();
46+
return consumerReady();
5047
}
5148

5249
@Override
5350
public void pump() {
5451

55-
if (context == null)
56-
return;
57-
58-
if (context.readyToFlip())
59-
context.flip();
60-
61-
if (context.readyToUnload() && consumerReady()) {
62-
consumer().onBatch();
63-
context.setUnloaded();
64-
}
52+
// No-op
6553
}
6654
}

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/arrow/ArrowDecoder.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,26 +129,17 @@ private boolean sendBatches() throws IOException {
129129
// Data arrives in one big buffer, so don't send it all at once
130130
// PUsh what the consumer requests, then wait for another call to pump()
131131

132-
while (context.readyToFlip() || context.readyToLoad()) {
132+
while (consumerReady()) {
133133

134-
if (context.readyToFlip())
135-
context.flip();
136-
137-
if (context.readyToLoad()) {
138-
139-
var batchAvailable = reader.loadNextBatch();
140-
141-
if (!batchAvailable)
142-
return true;
134+
var batchAvailable = reader.loadNextBatch();
143135

136+
if (batchAvailable) {
144137
context.setLoaded();
145-
146-
if (context.readyToFlip())
147-
context.flip();
148-
}
149-
150-
if (context.readyToUnload() && consumerReady())
151138
consumer().onBatch();
139+
}
140+
else {
141+
return true;
142+
}
152143
}
153144

154145
return false;

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/arrow/ArrowFileEncoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ public ArrowFileEncoder(BufferAllocator allocator) {
3434
@Override
3535
protected ArrowWriter createWriter(ArrowVsrContext context, BufferAllocator allocator) {
3636
var out = new ByteOutputChannel(context.getAllocator(), consumer()::onNext);
37-
return new ArrowFileWriterExt(context.getFrontBuffer(), context.getDictionaries(), out);
37+
return new ArrowFileWriterExt(context.getVsr(), context.getDictionaries(), out);
3838
}
3939
}

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/arrow/ArrowStreamDecoder.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void pump() {
124124

125125
private void sendBatches() throws IOException {
126126

127-
if (isDone())
127+
if (!consumerReady() || isDone())
128128
return;
129129

130130
if (context == null) {
@@ -143,37 +143,22 @@ else if (messageReader.hasEos()) {
143143
}
144144
}
145145

146-
if (context.readyToFlip())
147-
context.flip();
148-
149-
if (context.readyToLoad()) {
150-
151-
if (messageReader.hasMessage(MessageHeader.RecordBatch)) {
152-
153-
arrowReader.loadNextBatch();
154-
context.setLoaded();
155-
156-
if (context.readyToFlip())
157-
context.flip();
158-
}
159-
else if (messageReader.hasEos()) {
146+
while (consumerReady() && messageReader.hasMessage(MessageHeader.RecordBatch)) {
147+
arrowReader.loadNextBatch();
148+
consumer().onBatch();
149+
}
160150

161-
if (messageReader.hasMessage()) {
162-
var error = new EDataCorruption("Arrow decoding failed, unexpected messages after EOS marker");
163-
onError(error);
164-
return;
165-
}
151+
if (consumerReady() && messageReader.hasEos()) {
166152

167-
if (!isDone())
168-
markAsDone();
153+
if (messageReader.hasMessage()) {
154+
var error = new EDataCorruption("Arrow decoding failed, unexpected messages after EOS marker");
155+
onError(error);
156+
return;
169157
}
170-
}
171158

172-
if (context.readyToUnload() && consumerReady())
173-
consumer().onBatch();
174-
175-
if (isDone() && !context.readyToUnload())
159+
markAsDone();
176160
consumer().onComplete();
161+
}
177162
}
178163

179164
@Override

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/arrow/ArrowStreamEncoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ public ArrowStreamEncoder(BufferAllocator allocator) {
3434
@Override
3535
protected ArrowWriter createWriter(ArrowVsrContext context, BufferAllocator allocator) {
3636
var out = new ByteOutputChannel(context.getAllocator(), consumer()::onNext);
37-
return new ArrowStreamWriter(context.getFrontBuffer(), context.getDictionaries(), out);
37+
return new ArrowStreamWriter(context.getVsr(), context.getDictionaries(), out);
3838
}
3939
}

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/text/BaseTextDecoder.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,9 @@ boolean doParse() throws IOException {
205205
while (reader.readBatch()) {
206206

207207
context.setLoaded();
208-
context.flip();
209208
consumer().onBatch();
210-
context.setUnloaded();
211209

212-
reader.resetBatch(context.getBackBuffer());
210+
reader.resetBatch(context.getVsr());
213211
}
214212

215213
return reader.endOfStream();

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/text/BaseTextEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void onStart(ArrowVsrContext context) {
7373
this.out = new ByteOutputStream(allocator, consumer()::onNext);
7474

7575
this.writer = new TextFileWriter(
76-
context.getFrontBuffer(),
76+
context.getVsr(),
7777
context.getDictionaries(),
7878
this.out,
7979
configForSchema(this.config, context.getSchema()));
@@ -112,7 +112,7 @@ public void onBatch() {
112112
if (log.isTraceEnabled())
113113
log.trace("JSON ENCODER: onNext()");
114114

115-
var batch = context.getFrontBuffer();
115+
var batch = context.getVsr();
116116

117117
writer.resetBatch(batch);
118118
writer.writeBatch();

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/codec/text/BufferedTextDecoder.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,9 @@ boolean doParse() throws Exception {
172172
while (consumerReady() && reader.readBatch()) {
173173

174174
context.setLoaded();
175-
context.flip();
176175
consumer().onBatch();
177-
context.setUnloaded();
178176

179-
reader.resetBatch(context.getBackBuffer());
177+
reader.resetBatch(context.getVsr());
180178
}
181179

182180
return reader.endOfStream();

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/data/ArrowVsrContext.java

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,12 @@ public class ArrowVsrContext {
3030
private final ArrowVsrSchema schema;
3131

3232
private final BufferAllocator allocator;
33-
private final VectorSchemaRoot front;
34-
private final VectorSchemaRoot back;
33+
private final VectorSchemaRoot vsr;
3534
private final DictionaryProvider dictionaries;
3635

3736
private final boolean ownership;
3837

39-
private boolean backBufferLoaded;
40-
private boolean frontBufferAvailable;
41-
private boolean frontBufferUnloaded;
38+
private boolean loaded;
4239

4340
public static ArrowVsrContext forSource(VectorSchemaRoot source, DictionaryProvider dictionaries, BufferAllocator allocator) {
4441

@@ -56,8 +53,7 @@ private ArrowVsrContext(VectorSchemaRoot source, DictionaryProvider dictionaries
5653
this.schema = new ArrowVsrSchema(source.getSchema(), dictionaries);
5754

5855
this.allocator = allocator;
59-
this.back = source;
60-
this.front = back; // No double buffering yet
56+
this.vsr = source;
6157
this.dictionaries = dictionaries;
6258

6359
this.ownership = takeOwnership;
@@ -84,8 +80,7 @@ private ArrowVsrContext(ArrowVsrSchema schema, BufferAllocator allocator) {
8480
var root = new VectorSchemaRoot(fields, vectors);
8581
root.allocateNew();
8682

87-
this.back = root;
88-
this.front = root; // No double buffering yet
83+
this.vsr = root;
8984

9085
// Use pre-defined dictionaries from the schema (if there are any)
9186
this.dictionaries = schema.dictionaries();
@@ -102,55 +97,39 @@ public BufferAllocator getAllocator() {
10297
return allocator;
10398
}
10499

105-
public VectorSchemaRoot getBackBuffer() {
106-
return back;
107-
}
108-
109-
public VectorSchemaRoot getFrontBuffer() {
110-
return front;
100+
public VectorSchemaRoot getVsr() {
101+
return vsr;
111102
}
112103

113104
public DictionaryProvider getDictionaries() {
114105
return dictionaries;
115106
}
116-
117-
public boolean readyToLoad() {
118-
return ! backBufferLoaded;
119-
}
120-
121107
public void setRowCount(int nRows) {
122108

123-
back.setRowCount(nRows);
109+
vsr.setRowCount(nRows);
124110
}
125111

126112
public void setLoaded() {
127-
backBufferLoaded = true;
113+
loaded = true;
128114
}
129115

130-
public boolean readyToFlip() {
131-
return backBufferLoaded && (frontBufferUnloaded || !frontBufferAvailable);
116+
public void setUnloaded() {
117+
loaded = false;
132118
}
133119

134-
public void flip() {
135-
frontBufferAvailable = true;
136-
frontBufferUnloaded = false;
120+
public boolean readyToLoad() {
121+
return !loaded;
137122
}
138123

139124
public boolean readyToUnload() {
140-
return frontBufferAvailable && !frontBufferUnloaded;
141-
}
142-
143-
public void setUnloaded() {
144-
frontBufferUnloaded = true;
145-
frontBufferAvailable = false;
146-
backBufferLoaded = false;
125+
return loaded;
147126
}
148127

149128
public void close() {
150129

151130
if (ownership) {
152131

153-
back.close();
132+
vsr.close();
154133

155134
if (dictionaries != null) {
156135
for (var dictionaryId : dictionaries.getDictionaryIds()) {

tracdap-libs/tracdap-lib-data/src/main/java/org/finos/tracdap/common/data/pipeline/CounterStage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public long getRowCount() {
5353
@Override
5454
public void onBatch() {
5555
batchCount++;
56-
rowCount += batch.getFrontBuffer().getRowCount();
56+
rowCount += batch.getVsr().getRowCount();
5757
consumer().onBatch();
5858
}
5959

0 commit comments

Comments
 (0)