Skip to content

[FLINK-32097][Connectors/Kinesis] Implement support for Kinesis deaggregation #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

Lzgpom
Copy link

@Lzgpom Lzgpom commented Feb 1, 2025

Purpose of the change

This PR implements the Kinesis deaggregation. The implementation was heavily based on the old kinesis connector.

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests for with record deaggregation
  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

Further ToDos and Follow-ups

Checkpoints do not take into consideration the sequence number of the deaggregated records.

Brief change log

Significant changes

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

In order to implement record deaggregation, the RecordBatch uses the AggregatorUtil, from the KCL 3, with the subscribed shard to deaggregate.
The deaggregated records are now  instance of KinesisClientRecord since the AggregatorUtil requires it. Therefore, there is a bit of refactor to update the class of the record that was received.

Also, the RecordBatch class was extracted out of KinesisShardSplitReaderBase
…ng deaggregation.

There was a need to create aggregated records to test if the records are being deaggregated correctly.
For that there is a dependency that can create aggregated records, but unfortunately it does not have a version available in the maven repository that is compatible with the kcl 3.x. So it uses a release of github that is compatible.

Also, the protobuf version set in the dependency management was not compatible with the kcl 3.x
Copy link

boring-cyborg bot commented Feb 1, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@Lzgpom
Copy link
Author

Lzgpom commented Feb 5, 2025

@hlteoh37 could you take a look?

@nicusX
Copy link

nicusX commented Feb 7, 2025

@Lzgpom Building this locally and I've got a spotless-check format violation.
mvn spotless:apply fixes it

@nicusX
Copy link

nicusX commented Feb 7, 2025

I tested it locally, with a stream produced by KPL with aggregation enabled, and it worked fine.

@Lzgpom
Copy link
Author

Lzgpom commented Feb 7, 2025

@Lzgpom Building this locally and I've got a spotless-check format violation. mvn spotless:apply fixes it

I tried to run mvn spotless:apply but it didn't have any changes. It allows reported that Spotless apply skipped

@nicusX
Copy link

nicusX commented Feb 7, 2025

Uhm, spotless:apply it did some changes for me, and after that mvn package succeeded

@nicusX
Copy link

nicusX commented Feb 7, 2025

If I run a simple mvn clean package on your branch I get spotess-check violations in RecordBatchTest.java + 10 other files. And mvn spotless:apply changes 12 files.
All style definitions are part of the repo.
Not sure I understand the different result

@Lzgpom
Copy link
Author

Lzgpom commented Feb 7, 2025

@nicusX I was using the java 17 and the spotless did not work. I changed to java 11 and it worked.
Thanks for notifying me about this.

Copy link

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM
I verified that it now does not fail checkstyle checks.

Note for maintainers: There is a breaking change in the public API that forces a major version change.

<!-- the kinesis aggregator dependency since it is not available in maven central -->
<!-- look into issue https://github.com/awslabs/kinesis-aggregation/issues/120 -->
<groupId>com.github.awslabs.kinesis-aggregation</groupId>
<artifactId>amazon-kinesis-aggregator</artifactId>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -60,7 +60,7 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex
* @param output the identifier of the shard the record was sent to
* @throws IOException exception when deserializing record
*/
void deserialize(Record record, String stream, String shardId, Collector<T> output)
void deserialize(KinesisClientRecord record, String stream, String shardId, Collector<T> output)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change to the public interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@Lzgpom Have we considered alternatives to changing the interface? I can see that Hong suggested wrapping internally. Have we considered alternatives such as having a separate KinesisClientRecordDeserializationSchema?

@@ -50,10 +49,10 @@
/** Base implementation of the SplitReader for reading from KinesisShardSplits. */
@Internal
public abstract class KinesisShardSplitReaderBase
implements SplitReader<Record, KinesisShardSplit> {
implements SplitReader<KinesisClientRecord, KinesisShardSplit> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change actually changes the interface exposed for the KinesisSource, so it would be a backwards incompatible change. Is there a way we can wrap this internally?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been looking at the implications of not changing from Record into KinesisClientRecord and this is what I have found:

  • The deaggreated records would have to be converted from KinesisClientRecord, since AggregatorUtil only uses KinesisClientRecord, to Record adding a slight overhead.
  • The KinesisClientRecord has the subSequenceNumber that could and should be used for checkpointing.

Apart from that I don't see any more issues. Either way I don't mind changing it back to Record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lzgpom Can you elaborate on what the user experience would be with the current changes in PR? Will it break compilation for existing apps that already depend on KinesisSource without further code changes?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lzgpom I think the gist is, can you please change the interface back to using Record in all @Public and @PublicEvolving public methods?
This would allow including the change in a minor version of the connector, and users of 5.0.0 would be able to just update the connector version with no code changes

Comment on lines +37 to +41
<!-- used for the kinesis aggregator dependency since it is not available in maven central -->
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to work with the owner of the repo to get this published instead?

@@ -50,10 +49,10 @@
/** Base implementation of the SplitReader for reading from KinesisShardSplits. */
@Internal
public abstract class KinesisShardSplitReaderBase
implements SplitReader<Record, KinesisShardSplit> {
implements SplitReader<KinesisClientRecord, KinesisShardSplit> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lzgpom Can you elaborate on what the user experience would be with the current changes in PR? Will it break compilation for existing apps that already depend on KinesisSource without further code changes?

@@ -60,7 +60,7 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex
* @param output the identifier of the shard the record was sent to
* @throws IOException exception when deserializing record
*/
void deserialize(Record record, String stream, String shardId, Collector<T> output)
void deserialize(KinesisClientRecord record, String stream, String shardId, Collector<T> output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@Lzgpom Have we considered alternatives to changing the interface? I can see that Hong suggested wrapping internally. Have we considered alternatives such as having a separate KinesisClientRecordDeserializationSchema?

@@ -52,6 +60,11 @@ under the License.
<artifactId>kinesis</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a shame, can we do this without KCL? It's a heavy weight dependency for the 1 class(?) needed for deaggregation. We have been talking about removing this dependency with the new connector for years, but never really solved this bit. Can we ask Kinesis to publish a separate library for this? Or, we could just copy that file (assuming it is still a small asking of code)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dannycranmer KinesisClientRecord has some transitive dependencies.
However, the KCL can be substantially lightened removing most of transitive dependencies
See this example which is an alternative solution, implementing de-aggregation in the Deserialization Schema

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants