From 281cf5d2d22670692e2edd072c8fd36fc4579088 Mon Sep 17 00:00:00 2001 From: MaheshAravindV Date: Fri, 13 Jun 2025 15:33:33 +0000 Subject: [PATCH 1/5] Ensure most specific serializer is used When there are multiple serializers available for that a type can be assigned to, the most specific one should be used Signed-off-by: Mahesh Aravind V --- .../DelegatingByTypeSerializer.java | 26 ++++++++- .../DelegatingByTypeSerializerTest.java | 55 +++++++++++++++++++ 2 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java index 2c0cb21de7..ab2b91bce1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java @@ -16,7 +16,9 @@ package org.springframework.kafka.support.serializer; +import java.util.Comparator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -42,6 +44,17 @@ public class DelegatingByTypeSerializer implements Serializer { private final boolean assignable; + private static final Comparator, Serializer>> DELEGATES_ASSIGNABILITY_COMPARATOR = + (entry1, entry2) -> { + Class type1 = entry1.getKey(); + Class type2 = entry2.getKey(); + + if (type1.isAssignableFrom(type2)) return 1; + if (type2.isAssignableFrom(type1)) return -1; + + return 0; + }; + /** * Construct an instance with the map of delegates; keys matched exactly. * @param delegates the delegates. @@ -62,7 +75,16 @@ public DelegatingByTypeSerializer(Map, Serializer> delegates) { public DelegatingByTypeSerializer(Map, Serializer> delegates, boolean assignable) { Assert.notNull(delegates, "'delegates' cannot be null"); Assert.noNullElements(delegates.values(), "Serializers in delegates map cannot be null"); - this.delegates.putAll(delegates); + + if (!assignable) { + this.delegates.putAll(delegates); + } else { + List, Serializer>> sortedDelegates = delegates.entrySet().stream() + .sorted(DELEGATES_ASSIGNABILITY_COMPARATOR).toList(); + + sortedDelegates.forEach(it -> this.delegates.put(it.getKey(), it.getValue())); + } + this.assignable = assignable; } @@ -101,7 +123,7 @@ public byte[] serialize(String topic, Headers headers, Object data) { return delegate.serialize(topic, headers, data); } - private Serializer findDelegate(T data) { + protected Serializer findDelegate(T data) { return findDelegate(data, this.delegates); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java new file mode 100644 index 0000000000..b0487a489e --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2021-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import org.apache.kafka.common.serialization.Serializer; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; + +class DelegatingByTypeSerializerTest { + @Nested + class AssignableTest { + @Test + void shouldOrderDelegatesSoChildComesBeforeParent() { + class Parent {} + class Child extends Parent {} + Serializer mockParentSerializer = mock(Serializer.class); + Serializer mockChildSerializer = mock(Serializer.class); + + // Using LinkedHashMap to ensure the order is always wrong + Map, Serializer> delegates = new LinkedHashMap<>(); + delegates.put(Parent.class, mockParentSerializer); + delegates.put(Child.class, mockChildSerializer); + + DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer(delegates, true); + + + Serializer childSerializer = serializer.findDelegate(mock(Child.class)); + Serializer parentSerializer = serializer.findDelegate(mock(Parent.class)); + + + assertEquals(mockChildSerializer, childSerializer); + assertEquals(mockParentSerializer, parentSerializer); + } + } +} \ No newline at end of file From 4d30cf28172a40250f71ea663eafd6a5c6f747cd Mon Sep 17 00:00:00 2001 From: MaheshAravindV Date: Fri, 13 Jun 2025 15:42:36 +0000 Subject: [PATCH 2/5] Add author credits Signed-off-by: Mahesh Aravind V --- .../kafka/support/serializer/DelegatingByTypeSerializer.java | 1 + .../support/serializer/DelegatingByTypeSerializerTest.java | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java index ab2b91bce1..5d7031c887 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java @@ -34,6 +34,7 @@ * @author Gary Russell * @author Artem Bilan * @author Wang Zhiyang + * @author Mahesh Aravind V * * @since 2.7.9 * diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java index b0487a489e..63045d3a88 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java @@ -26,6 +26,10 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; +/** + * @author Mahesh Aravind V + * + */ class DelegatingByTypeSerializerTest { @Nested class AssignableTest { From b36f3af1eacc52ae6e71e39b844e30b0eb1b220d Mon Sep 17 00:00:00 2001 From: MaheshAravindV Date: Fri, 13 Jun 2025 15:49:15 +0000 Subject: [PATCH 3/5] Update javadoc Signed-off-by: Mahesh Aravind V --- .../serializer/DelegatingByTypeSerializer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java index 5d7031c887..e657935883 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java @@ -65,12 +65,12 @@ public DelegatingByTypeSerializer(Map, Serializer> delegates) { } /** - * Construct an instance with the map of delegates; keys matched exactly or if the - * target object is assignable to the key, depending on the assignable argument. - * If assignable, entries are checked in the natural entry order so an ordered map - * such as a {@link LinkedHashMap} is recommended. - * @param delegates the delegates. - * @param assignable whether the target is assignable to the key. + * Construct an instance with the map of delegates. + * If {@code assignable} is {@code false}, only exact key matches are considered. + * If {@code assignable} is {@code true}, a delegate is selected if its key class + * is assignable from the target object's class. When multiple matches are possible, + * the most specific matching class is selected — that is, the closest match in the + * class hierarchy. * @since 2.8.3 */ public DelegatingByTypeSerializer(Map, Serializer> delegates, boolean assignable) { From 0e586c9240a7b8a41526c076bda86eae63f09699 Mon Sep 17 00:00:00 2001 From: MaheshAravindV Date: Fri, 13 Jun 2025 16:26:06 +0000 Subject: [PATCH 4/5] Fix styling violations Signed-off-by: Mahesh Aravind V --- .../DelegatingByTypeSerializer.java | 24 +++++++++++++------ ...a => DelegatingByTypeSerializerTests.java} | 22 +++++++++-------- 2 files changed, 29 insertions(+), 17 deletions(-) rename spring-kafka/src/test/java/org/springframework/kafka/support/serializer/{DelegatingByTypeSerializerTest.java => DelegatingByTypeSerializerTests.java} (85%) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java index e657935883..e381f6ef7c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java @@ -41,21 +41,25 @@ */ public class DelegatingByTypeSerializer implements Serializer { - private final Map, Serializer> delegates = new LinkedHashMap<>(); - - private final boolean assignable; - private static final Comparator, Serializer>> DELEGATES_ASSIGNABILITY_COMPARATOR = (entry1, entry2) -> { Class type1 = entry1.getKey(); Class type2 = entry2.getKey(); - if (type1.isAssignableFrom(type2)) return 1; - if (type2.isAssignableFrom(type1)) return -1; + if (type1.isAssignableFrom(type2)) { + return 1; + } + if (type2.isAssignableFrom(type1)) { + return -1; + } return 0; }; + private final Map, Serializer> delegates = new LinkedHashMap<>(); + + private final boolean assignable; + /** * Construct an instance with the map of delegates; keys matched exactly. * @param delegates the delegates. @@ -71,6 +75,11 @@ public DelegatingByTypeSerializer(Map, Serializer> delegates) { * is assignable from the target object's class. When multiple matches are possible, * the most specific matching class is selected — that is, the closest match in the * class hierarchy. + * + * @param delegates the delegates + * @param assignable true if {@link #findDelegate(Object, Map)} should consider assignability to + * the key rather than an exact match. + * * @since 2.8.3 */ public DelegatingByTypeSerializer(Map, Serializer> delegates, boolean assignable) { @@ -79,7 +88,8 @@ public DelegatingByTypeSerializer(Map, Serializer> delegates, boolea if (!assignable) { this.delegates.putAll(delegates); - } else { + } + else { List, Serializer>> sortedDelegates = delegates.entrySet().stream() .sorted(DELEGATES_ASSIGNABILITY_COMPARATOR).toList(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTests.java similarity index 85% rename from spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java rename to spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTests.java index 63045d3a88..5fe72fe233 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTests.java @@ -16,27 +16,29 @@ package org.springframework.kafka.support.serializer; +import java.util.LinkedHashMap; +import java.util.Map; + import org.apache.kafka.common.serialization.Serializer; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import java.util.LinkedHashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; /** * @author Mahesh Aravind V * */ -class DelegatingByTypeSerializerTest { +class DelegatingByTypeSerializerTests { @Nested class AssignableTest { @Test void shouldOrderDelegatesSoChildComesBeforeParent() { - class Parent {} - class Child extends Parent {} + class Parent { } + + class Child extends Parent { } + Serializer mockParentSerializer = mock(Serializer.class); Serializer mockChildSerializer = mock(Serializer.class); @@ -52,8 +54,8 @@ class Child extends Parent {} Serializer parentSerializer = serializer.findDelegate(mock(Parent.class)); - assertEquals(mockChildSerializer, childSerializer); - assertEquals(mockParentSerializer, parentSerializer); + assertThat(childSerializer).isEqualTo(mockChildSerializer); + assertThat(parentSerializer).isEqualTo(mockParentSerializer); } } -} \ No newline at end of file +} From 535bfb38c2810ac6f2bcdecb5db66d3a0213fef1 Mon Sep 17 00:00:00 2001 From: Mahesh Aravind V Date: Mon, 16 Jun 2025 15:47:10 +0000 Subject: [PATCH 5/5] Use TreeMap --- .../DelegatingByTypeSerializer.java | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java index e381f6ef7c..35e7a26ee0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java @@ -16,10 +16,7 @@ package org.springframework.kafka.support.serializer; -import java.util.Comparator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; import org.apache.kafka.common.errors.SerializationException; @@ -41,10 +38,8 @@ */ public class DelegatingByTypeSerializer implements Serializer { - private static final Comparator, Serializer>> DELEGATES_ASSIGNABILITY_COMPARATOR = - (entry1, entry2) -> { - Class type1 = entry1.getKey(); - Class type2 = entry2.getKey(); + private static final Comparator> DELEGATES_ASSIGNABILITY_COMPARATOR = + (type1, type2) -> { if (type1.isAssignableFrom(type2)) { return 1; @@ -56,7 +51,7 @@ public class DelegatingByTypeSerializer implements Serializer { return 0; }; - private final Map, Serializer> delegates = new LinkedHashMap<>(); + private final Map, Serializer> delegates = new TreeMap<>(DELEGATES_ASSIGNABILITY_COMPARATOR); private final boolean assignable; @@ -86,16 +81,7 @@ public DelegatingByTypeSerializer(Map, Serializer> delegates, boolea Assert.notNull(delegates, "'delegates' cannot be null"); Assert.noNullElements(delegates.values(), "Serializers in delegates map cannot be null"); - if (!assignable) { - this.delegates.putAll(delegates); - } - else { - List, Serializer>> sortedDelegates = delegates.entrySet().stream() - .sorted(DELEGATES_ASSIGNABILITY_COMPARATOR).toList(); - - sortedDelegates.forEach(it -> this.delegates.put(it.getKey(), it.getValue())); - } - + this.delegates.putAll(delegates); this.assignable = assignable; }