Skip to content

S3SourceConnector loses offsets on JVM restart due to Map.of() non-deterministic iteration order #599

Description

@rlanhellas

Bug Description

The S3SourceConnector restarts processing from the beginning after every Kafka Connect cluster/JVM restart, even when offsets were previously committed to the connect-offsets topic. The connector loses all progress and reprocesses already-consumed files.

Root Cause

S3OffsetManagerEntry.java builds the partition key using Map.of() in two places:

// asKey() static method
public static OffsetManager.OffsetManagerKey asKey(final String bucket, final String s3ObjectKey) {
    return () -> Map.of(BUCKET, bucket, OBJECT_KEY, s3ObjectKey);
}

// getManagerKey() instance method
@Override
public OffsetManager.OffsetManagerKey getManagerKey() {
    return () -> Map.of(BUCKET, bucket, OBJECT_KEY, objectKey);
}

Map.of() returns an ImmutableCollections.MapN whose iteration order is intentionally randomized at JVM startup via a System.nanoTime() seed:

// OpenJDK ImmutableCollections.java
static {
    long seed = System.nanoTime(); // random on each JVM start
    SALT32L = (int)((color * seed) >> 16) & 0xFFFF_FFFFL;
    REVERSE = (SALT32L & 1) == 0; // 50/50 flip on each restart
}

REVERSE controls the direction of iteration — for a 2-entry map like Map.of(BUCKET, ..., OBJECT_KEY, ...), there is a ~50% chance on each JVM restart that the two fields are serialized in a different order.

Failure Chain

JVM restart
  → new SALT32L + REVERSE randomized by System.nanoTime()
  → Map.of("bucket", ..., "objectKey", ...) iterates fields in different order
  → JsonConverter serializes the Map to different JSON bytes
      JVM run 1: {"objectKey":"PATH/HERE","bucket":"bucket-name"}
      JVM run 2: {"bucket":"bucket-name","objectKey":"PATH/HERE"}
  → OffsetStorageReaderImpl does ByteBuffer byte-level lookup
  → lookup misses the previously stored offset key
  → connector treats all files as unprocessed
  → starts consuming from the very first object in the bucket

This was confirmed by inspecting the connect-offsets Kafka topic directly: two different key formats for the same logical partition were observed, one written before the restart and one after.

Impact

  • Any cluster restart (planned or unplanned) causes the connector to reprocess all objects from the beginning of the bucket prefix.
  • Data duplication in the destination topic proportional to the total number of objects in the bucket.
  • No data loss, but potentially severe reprocessing overhead and downstream side effects for non-idempotent consumers.

Reproduction

  1. Configure S3SourceConnector with a bucket containing multiple objects.
  2. Let it run and commit offsets (wait for at least one offset.flush.interval.ms cycle).
  3. Restart the Kafka Connect cluster (JVM restart).
  4. Observe: the connector begins reprocessing from the first object instead of resuming.
  5. Inspect connect-offsets topic: the key written after the restart has fields in a different JSON order than the key written before.

This is non-deterministic (~50% per restart) due to the randomized REVERSE flag.

Fix

Replace Map.of() with LinkedHashMap (insertion-ordered, deterministic across JVM restarts) in both asKey() and getManagerKey():

public static OffsetManager.OffsetManagerKey asKey(final String bucket, final String s3ObjectKey) {
    return () -> {
        final Map<String, Object> m = new LinkedHashMap<>();
        m.put(BUCKET, bucket);
        m.put(OBJECT_KEY, s3ObjectKey);
        return m;
    };
}

@Override
public OffsetManager.OffsetManagerKey getManagerKey() {
    return () -> {
        final Map<String, Object> m = new LinkedHashMap<>();
        m.put(BUCKET, bucket);
        m.put(OBJECT_KEY, objectKey);
        return m;
    };
}

Alternatively, a TreeMap (sorted by key name) would also work and would additionally guarantee alphabetical ordering regardless of insertion order.

Note on Existing Deployments

Users who have already been affected by this bug will have two different key formats coexisting in their connect-offsets topic. After applying the fix, the connector will consistently use one format — but existing offsets stored under the old (opposite) format will remain unreachable. Affected users will need to either:

  • Manually rewrite the stale offset entries to the new key format, or
  • Accept a one-time reprocessing from the beginning after upgrading.

Environment

  • Connector: io.aiven.kafka.connect.s3.source.S3SourceConnector
  • Storage backend: GCS via S3-compatible API (https://storage.googleapis.com)
  • Kafka Connect: Strimzi-managed cluster
  • Java: OpenJDK (any modern version with ImmutableCollections.REVERSE randomization — Java 9+)

References

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions