diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java index 2c0cb21de7..35e7a26ee0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java @@ -16,8 +16,7 @@ package org.springframework.kafka.support.serializer; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; import org.apache.kafka.common.errors.SerializationException; @@ -32,13 +31,27 @@ * @author Gary Russell * @author Artem Bilan * @author Wang Zhiyang + * @author Mahesh Aravind V * * @since 2.7.9 * */ public class DelegatingByTypeSerializer implements Serializer { - private final Map, Serializer> delegates = new LinkedHashMap<>(); + private static final Comparator> DELEGATES_ASSIGNABILITY_COMPARATOR = + (type1, type2) -> { + + if (type1.isAssignableFrom(type2)) { + return 1; + } + if (type2.isAssignableFrom(type1)) { + return -1; + } + + return 0; + }; + + private final Map, Serializer> delegates = new TreeMap<>(DELEGATES_ASSIGNABILITY_COMPARATOR); private final boolean assignable; @@ -51,17 +64,23 @@ public DelegatingByTypeSerializer(Map, Serializer> delegates) { } /** - * Construct an instance with the map of delegates; keys matched exactly or if the - * target object is assignable to the key, depending on the assignable argument. - * If assignable, entries are checked in the natural entry order so an ordered map - * such as a {@link LinkedHashMap} is recommended. - * @param delegates the delegates. - * @param assignable whether the target is assignable to the key. + * Construct an instance with the map of delegates. + * If {@code assignable} is {@code false}, only exact key matches are considered. + * If {@code assignable} is {@code true}, a delegate is selected if its key class + * is assignable from the target object's class. When multiple matches are possible, + * the most specific matching class is selected — that is, the closest match in the + * class hierarchy. + * + * @param delegates the delegates + * @param assignable true if {@link #findDelegate(Object, Map)} should consider assignability to + * the key rather than an exact match. + * * @since 2.8.3 */ public DelegatingByTypeSerializer(Map, Serializer> delegates, boolean assignable) { Assert.notNull(delegates, "'delegates' cannot be null"); Assert.noNullElements(delegates.values(), "Serializers in delegates map cannot be null"); + this.delegates.putAll(delegates); this.assignable = assignable; } @@ -101,7 +120,7 @@ public byte[] serialize(String topic, Headers headers, Object data) { return delegate.serialize(topic, headers, data); } - private Serializer findDelegate(T data) { + protected Serializer findDelegate(T data) { return findDelegate(data, this.delegates); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTests.java new file mode 100644 index 0000000000..5fe72fe233 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializerTests.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.kafka.common.serialization.Serializer; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * @author Mahesh Aravind V + * + */ +class DelegatingByTypeSerializerTests { + @Nested + class AssignableTest { + @Test + void shouldOrderDelegatesSoChildComesBeforeParent() { + class Parent { } + + class Child extends Parent { } + + Serializer mockParentSerializer = mock(Serializer.class); + Serializer mockChildSerializer = mock(Serializer.class); + + // Using LinkedHashMap to ensure the order is always wrong + Map, Serializer> delegates = new LinkedHashMap<>(); + delegates.put(Parent.class, mockParentSerializer); + delegates.put(Child.class, mockChildSerializer); + + DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer(delegates, true); + + + Serializer childSerializer = serializer.findDelegate(mock(Child.class)); + Serializer parentSerializer = serializer.findDelegate(mock(Parent.class)); + + + assertThat(childSerializer).isEqualTo(mockChildSerializer); + assertThat(parentSerializer).isEqualTo(mockParentSerializer); + } + } +}