Skip to content

Commit 767f576

Browse files
committed
[FLINK-32097][Connectors/Kinesis] Implemented record deaggregation.
In order to implement record deaggregation, the RecordBatch uses the AggregatorUtil, from the KCL 3, with the subscribed shard to deaggregate. The deaggregated records are now instance of KinesisClientRecord since the AggregatorUtil requires it. Therefore, there is a bit of refactor to update the class of the record that was received. Also, the RecordBatch class was extracted out of KinesisShardSplitReaderBase
1 parent b55dec1 commit 767f576

File tree

15 files changed

+241
-137
lines changed

15 files changed

+241
-137
lines changed

flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ under the License.
5252
<artifactId>kinesis</artifactId>
5353
</dependency>
5454

55+
<dependency>
56+
<groupId>software.amazon.kinesis</groupId>
57+
<artifactId>amazon-kinesis-client</artifactId>
58+
</dependency>
59+
5560
<dependency>
5661
<groupId>software.amazon.awssdk</groupId>
5762
<artifactId>arns</artifactId>

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@
7070
import software.amazon.awssdk.services.kinesis.KinesisClient;
7171
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
7272
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
73-
import software.amazon.awssdk.services.kinesis.model.Record;
7473
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
7574
import software.amazon.awssdk.utils.AttributeMap;
75+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
7676

7777
import java.time.Duration;
7878
import java.util.Map;
@@ -209,7 +209,7 @@ public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
209209
return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
210210
}
211211

212-
private Supplier<SplitReader<Record, KinesisShardSplit>> getKinesisShardSplitReaderSupplier(
212+
private Supplier<SplitReader<KinesisClientRecord, KinesisShardSplit>> getKinesisShardSplitReaderSupplier(
213213
Configuration sourceConfig, Map<String, KinesisShardMetrics> shardMetricGroupMap) {
214214
KinesisSourceConfigOptions.ReaderType readerType = sourceConfig.get(READER_TYPE);
215215
switch (readerType) {

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929

3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
32-
import software.amazon.awssdk.services.kinesis.model.Record;
3332
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
33+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
3434

3535
import javax.annotation.Nullable;
3636

@@ -41,7 +41,6 @@
4141
import java.util.Deque;
4242
import java.util.HashSet;
4343
import java.util.Iterator;
44-
import java.util.List;
4544
import java.util.Map;
4645
import java.util.Set;
4746

@@ -50,10 +49,10 @@
5049
/** Base implementation of the SplitReader for reading from KinesisShardSplits. */
5150
@Internal
5251
public abstract class KinesisShardSplitReaderBase
53-
implements SplitReader<Record, KinesisShardSplit> {
52+
implements SplitReader<KinesisClientRecord, KinesisShardSplit> {
5453

5554
private static final Logger LOG = LoggerFactory.getLogger(KinesisShardSplitReaderBase.class);
56-
private static final RecordsWithSplitIds<Record> INCOMPLETE_SHARD_EMPTY_RECORDS =
55+
private static final RecordsWithSplitIds<KinesisClientRecord> INCOMPLETE_SHARD_EMPTY_RECORDS =
5756
new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false);
5857

5958
private final Deque<KinesisShardSplitState> assignedSplits = new ArrayDeque<>();
@@ -65,7 +64,7 @@ protected KinesisShardSplitReaderBase(Map<String, KinesisShardMetrics> shardMetr
6564
}
6665

6766
@Override
68-
public RecordsWithSplitIds<Record> fetch() throws IOException {
67+
public RecordsWithSplitIds<KinesisClientRecord> fetch() throws IOException {
6968
KinesisShardSplitState splitState = assignedSplits.poll();
7069

7170
// When there are no assigned splits, return quickly
@@ -103,7 +102,7 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
103102
.get(splitState.getShardId())
104103
.setMillisBehindLatest(recordBatch.getMillisBehindLatest());
105104

106-
if (recordBatch.getRecords().isEmpty()) {
105+
if (recordBatch.getDeaggregatedRecords().isEmpty()) {
107106
if (recordBatch.isCompleted()) {
108107
return new KinesisRecordsWithSplitIds(
109108
Collections.emptyIterator(), splitState.getSplitId(), true);
@@ -115,12 +114,12 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
115114
splitState.setNextStartingPosition(
116115
StartingPosition.continueFromSequenceNumber(
117116
recordBatch
118-
.getRecords()
119-
.get(recordBatch.getRecords().size() - 1)
117+
.getDeaggregatedRecords()
118+
.get(recordBatch.getDeaggregatedRecords().size() - 1)
120119
.sequenceNumber()));
121120

122121
return new KinesisRecordsWithSplitIds(
123-
recordBatch.getRecords().iterator(),
122+
recordBatch.getDeaggregatedRecords().iterator(),
124123
splitState.getSplitId(),
125124
recordBatch.isCompleted());
126125
}
@@ -154,48 +153,19 @@ public void pauseOrResumeSplits(
154153
splitsToResume.forEach(split -> pausedSplitIds.remove(split.splitId()));
155154
}
156155

157-
/**
158-
* Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records
159-
* from the SplitReader implementation to the SplitReaderBase.
160-
*/
161-
@Internal
162-
protected static class RecordBatch {
163-
private final List<Record> records;
164-
private final long millisBehindLatest;
165-
private final boolean completed;
166-
167-
public RecordBatch(List<Record> records, long millisBehindLatest, boolean completed) {
168-
this.records = records;
169-
this.millisBehindLatest = millisBehindLatest;
170-
this.completed = completed;
171-
}
172-
173-
public List<Record> getRecords() {
174-
return records;
175-
}
176-
177-
public long getMillisBehindLatest() {
178-
return millisBehindLatest;
179-
}
180-
181-
public boolean isCompleted() {
182-
return completed;
183-
}
184-
}
185-
186156
/**
187157
* Implementation of {@link RecordsWithSplitIds} for sending Kinesis records from fetcher to the
188158
* SourceReader.
189159
*/
190160
@Internal
191-
private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds<Record> {
161+
private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds<KinesisClientRecord> {
192162

193-
private final Iterator<Record> recordsIterator;
163+
private final Iterator<KinesisClientRecord> recordsIterator;
194164
private final String splitId;
195165
private final boolean isComplete;
196166

197167
public KinesisRecordsWithSplitIds(
198-
Iterator<Record> recordsIterator, String splitId, boolean isComplete) {
168+
Iterator<KinesisClientRecord> recordsIterator, String splitId, boolean isComplete) {
199169
this.recordsIterator = recordsIterator;
200170
this.splitId = splitId;
201171
this.isComplete = isComplete;
@@ -209,7 +179,7 @@ public String nextSplit() {
209179

210180
@Nullable
211181
@Override
212-
public Record nextRecordFromSplit() {
182+
public KinesisClientRecord nextRecordFromSplit() {
213183
return recordsIterator.hasNext() ? recordsIterator.next() : null;
214184
}
215185

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
2727
import org.apache.flink.util.Collector;
2828

29-
import software.amazon.awssdk.services.kinesis.model.Record;
29+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
3030

3131
/**
3232
* Emits record from the source into the Flink job graph. This serves as the interface between the
@@ -36,7 +36,7 @@
3636
*/
3737
@Internal
3838
public class KinesisStreamsRecordEmitter<T>
39-
implements RecordEmitter<Record, T, KinesisShardSplitState> {
39+
implements RecordEmitter<KinesisClientRecord, T, KinesisShardSplitState> {
4040

4141
private final KinesisDeserializationSchema<T> deserializationSchema;
4242
private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();
@@ -47,7 +47,7 @@ public KinesisStreamsRecordEmitter(KinesisDeserializationSchema<T> deserializati
4747

4848
@Override
4949
public void emitRecord(
50-
Record element, SourceOutput<T> output, KinesisShardSplitState splitState)
50+
KinesisClientRecord element, SourceOutput<T> output, KinesisShardSplitState splitState)
5151
throws Exception {
5252
sourceOutputWrapper.setSourceOutput(output);
5353
sourceOutputWrapper.setTimestamp(element.approximateArrivalTimestamp().toEpochMilli());

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
34-
import software.amazon.awssdk.services.kinesis.model.Record;
34+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
3535

3636
import java.util.HashSet;
3737
import java.util.List;
@@ -45,14 +45,14 @@
4545
@Internal
4646
public class KinesisStreamsSourceReader<T>
4747
extends SingleThreadMultiplexSourceReaderBase<
48-
Record, T, KinesisShardSplit, KinesisShardSplitState> {
48+
KinesisClientRecord, T, KinesisShardSplit, KinesisShardSplitState> {
4949

5050
private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class);
5151
private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
5252

5353
public KinesisStreamsSourceReader(
54-
SingleThreadFetcherManager<Record, KinesisShardSplit> splitFetcherManager,
55-
RecordEmitter<Record, T, KinesisShardSplitState> recordEmitter,
54+
SingleThreadFetcherManager<KinesisClientRecord, KinesisShardSplit> splitFetcherManager,
55+
RecordEmitter<KinesisClientRecord, T, KinesisShardSplitState> recordEmitter,
5656
Configuration config,
5757
SourceReaderContext context,
5858
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kinesis.source.reader;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
23+
24+
import software.amazon.awssdk.services.kinesis.model.Record;
25+
import software.amazon.kinesis.retrieval.AggregatorUtil;
26+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
27+
28+
import java.util.List;
29+
import java.util.stream.Collectors;
30+
31+
/**
32+
* Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records
33+
* from the SplitReader implementation to the SplitReaderBase.
34+
*
35+
* <p>Input records are de-aggregated using KCL 3.x library. It is expected that AWS SDK v2.x messages
36+
* are converted to KCL 3.x {@link KinesisClientRecord}.
37+
*/
38+
@Internal
39+
public class RecordBatch {
40+
private final List<KinesisClientRecord> deaggregatedRecords;
41+
private final long millisBehindLatest;
42+
private final boolean completed;
43+
44+
public RecordBatch(
45+
final List<Record> records,
46+
final KinesisShardSplit subscribedShard,
47+
final long millisBehindLatest,
48+
final boolean completed) {
49+
this.deaggregatedRecords = deaggregateRecords(records, subscribedShard);
50+
this.millisBehindLatest = millisBehindLatest;
51+
this.completed = completed;
52+
}
53+
54+
public List<KinesisClientRecord> getDeaggregatedRecords() {
55+
return deaggregatedRecords;
56+
}
57+
58+
public long getMillisBehindLatest() {
59+
return millisBehindLatest;
60+
}
61+
62+
public boolean isCompleted() {
63+
return completed;
64+
}
65+
66+
private List<KinesisClientRecord> deaggregateRecords(
67+
final List<Record> records,
68+
final KinesisShardSplit subscribedShard) {
69+
final List<KinesisClientRecord> kinesisClientRecords = records.stream()
70+
.map(KinesisClientRecord::fromRecord)
71+
.collect(Collectors.toList());
72+
73+
final String startingHashKey = subscribedShard.getStartingHashKey();
74+
final String endingHashKey = subscribedShard.getEndingHashKey();
75+
76+
return new AggregatorUtil().deaggregate(kinesisClientRecords, startingHashKey, endingHashKey);
77+
}
78+
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
2424
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
2525
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
26+
import org.apache.flink.connector.kinesis.source.reader.RecordBatch;
2627
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
2728
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
2829

@@ -69,7 +70,11 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
6970
if (shardCompleted) {
7071
splitSubscriptions.remove(splitState.getShardId());
7172
}
72-
return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted);
73+
return new RecordBatch(
74+
event.records(),
75+
splitState.getKinesisShardSplit(),
76+
event.millisBehindLatest(),
77+
shardCompleted);
7378
}
7479

7580
@Override

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
2525
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
2626
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
27+
import org.apache.flink.connector.kinesis.source.reader.RecordBatch;
2728
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
2829

2930
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@@ -59,8 +60,12 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
5960
splitState.getNextStartingPosition(),
6061
this.maxRecordsToGet);
6162
boolean isCompleted = getRecordsResponse.nextShardIterator() == null;
63+
6264
return new RecordBatch(
63-
getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted);
65+
getRecordsResponse.records(),
66+
splitState.getKinesisShardSplit(),
67+
getRecordsResponse.millisBehindLatest(),
68+
isCompleted);
6469
}
6570

6671
@Override

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
2424
import org.apache.flink.util.Collector;
2525

26-
import software.amazon.awssdk.services.kinesis.model.Record;
26+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
2727

2828
import java.io.IOException;
2929
import java.io.Serializable;
@@ -60,7 +60,7 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex
6060
* @param output the identifier of the shard the record was sent to
6161
* @throws IOException exception when deserializing record
6262
*/
63-
void deserialize(Record record, String stream, String shardId, Collector<T> output)
63+
void deserialize(KinesisClientRecord record, String stream, String shardId, Collector<T> output)
6464
throws IOException;
6565

6666
static <T> KinesisDeserializationSchema<T> of(DeserializationSchema<T> deserializationSchema) {

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
2323
import org.apache.flink.util.Collector;
2424

25-
import software.amazon.awssdk.services.kinesis.model.Record;
25+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
2626

2727
import java.io.IOException;
28+
import java.nio.ByteBuffer;
2829

2930
/**
3031
* A simple wrapper for using the {@link DeserializationSchema} with the {@link
@@ -48,9 +49,14 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc
4849
}
4950

5051
@Override
51-
public void deserialize(Record record, String stream, String shardId, Collector<T> output)
52+
public void deserialize(KinesisClientRecord record, String stream, String shardId, Collector<T> output)
5253
throws IOException {
53-
deserializationSchema.deserialize(record.data().asByteArray(), output);
54+
ByteBuffer recordData = record.data();
55+
56+
byte[] dataBytes = new byte[recordData.remaining()];
57+
recordData.get(dataBytes);
58+
59+
deserializationSchema.deserialize(dataBytes, output);
5460
}
5561

5662
@Override

0 commit comments

Comments
 (0)