From 9725bf3eaccaf0b375fc2ad50c1a6cfb3ca3ab1b Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 4 Mar 2019 18:24:41 -0500 Subject: [PATCH] GH-2780: ByteArrayLengthHeaderSerializer.inclusive Resolves https://github.com/spring-projects/spring-integration/issues/2780 --- .../ByteArrayLengthHeaderSerializer.java | 45 ++++++++-- .../LengthHeaderSerializationTests.java | 90 +++++++++++++------ src/reference/asciidoc/ip.adoc | 3 + src/reference/asciidoc/whats-new.adoc | 4 + 4 files changed, 109 insertions(+), 33 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayLengthHeaderSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayLengthHeaderSerializer.java index 757a40d3e8a..9faeaee6f6e 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayLengthHeaderSerializer.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayLengthHeaderSerializer.java @@ -21,9 +21,6 @@ import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * Reads data in an InputStream to a byte[]; data must be preceded by @@ -66,7 +63,7 @@ public class ByteArrayLengthHeaderSerializer extends AbstractByteArraySerializer private final int headerSize; - private final Log logger = LogFactory.getLog(this.getClass()); + private int headerAdjust; /** * Constructs the serializer using {@link #HEADER_SIZE_INT} @@ -90,6 +87,42 @@ public ByteArrayLengthHeaderSerializer(int headerSize) { this.headerSize = headerSize; } + /** + * Return true if the lenght header value includes its own length. + * @return true if the length includes the header length. + * @since 5.2 + */ + protected boolean isInclusive() { + return this.headerAdjust > 0; + } + + /** + * Set to true to set the length header to include the length of the header in + * addition to the payload. Valid header sizes are {@link #HEADER_SIZE_INT} (default), + * {@link #HEADER_SIZE_UNSIGNED_BYTE} and {@link #HEADER_SIZE_UNSIGNED_SHORT} and 4, 1 + * and 2 will be added to the payload length respectively. + * @param inclusive true to include the header length. + * @since 5.2 + * @see #inclusive() + */ + public void setInclusive(boolean inclusive) { + this.headerAdjust = inclusive ? this.headerSize : 0; + } + + /** + * Include the length of the header in addition to the payload. Valid header sizes are + * {@link #HEADER_SIZE_INT} (default), {@link #HEADER_SIZE_UNSIGNED_BYTE} and + * {@link #HEADER_SIZE_UNSIGNED_SHORT} and 4, 1 and 2 will be added to the payload + * length respectively. Fluent API form of {@link #setInclusive(boolean)}. + * @return the serializer. + * @since 5.2 + * @see #setInclusive(boolean) + */ + public ByteArrayLengthHeaderSerializer inclusive() { + setInclusive(true); + return this; + } + /** * Reads the header from the stream and then reads the provided length * from the stream and returns the data in a byte[]. Throws an @@ -102,7 +135,7 @@ public ByteArrayLengthHeaderSerializer(int headerSize) { */ @Override public byte[] deserialize(InputStream inputStream) throws IOException { - int messageLength = this.readHeader(inputStream); + int messageLength = this.readHeader(inputStream) - this.headerAdjust; if (this.logger.isDebugEnabled()) { this.logger.debug("Message length is " + messageLength); } @@ -135,7 +168,7 @@ public byte[] deserialize(InputStream inputStream) throws IOException { */ @Override public void serialize(byte[] bytes, OutputStream outputStream) throws IOException { - this.writeHeader(outputStream, bytes.length); + this.writeHeader(outputStream, bytes.length + this.headerAdjust); outputStream.write(bytes); } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/LengthHeaderSerializationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/LengthHeaderSerializationTests.java index 80318388b7a..551298da3ab 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/LengthHeaderSerializationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/LengthHeaderSerializationTests.java @@ -17,10 +17,11 @@ package org.springframework.integration.ip.tcp.serializer; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Arrays; import org.junit.Before; @@ -52,7 +53,7 @@ public void setup() { } @Test - public void testInt() throws Exception { + public void testInt() throws IOException { AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); serializer.serialize(TEST.getBytes(), bos); @@ -65,16 +66,12 @@ public void testInt() throws Exception { bytes = serializer.deserialize(bis); assertThat(new String(bytes)).isEqualTo(TEST); bytes[0] = -1; - bis = new ByteArrayInputStream(bytes); - try { - bytes = serializer.deserialize(bis); - fail("Expected negative length"); - } - catch (IllegalArgumentException e) { } + ByteArrayInputStream bisBad = new ByteArrayInputStream(bytes); + assertThatIllegalArgumentException().isThrownBy(() -> serializer.deserialize(bisBad)); } @Test - public void testByte() throws Exception { + public void testByte() throws IOException { AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer( ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_BYTE); ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -85,15 +82,11 @@ public void testByte() throws Exception { bytes = serializer.deserialize(bis); assertThat(new String(bytes)).isEqualTo(test255); test255 += "x"; - try { - serializer.serialize(test255.getBytes(), bos); - fail("Expected overflow"); - } - catch (IllegalArgumentException e) { } + assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(test255.getBytes(), bos)); } @Test - public void testShort1() throws Exception { + public void testShort1() throws IOException { AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer( ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT); ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -107,7 +100,7 @@ public void testShort1() throws Exception { } @Test - public void testShort2() throws Exception { + public void testShort2() throws IOException { AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer( ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT); serializer.setMaxMessageSize(0x10000); @@ -120,20 +113,63 @@ public void testShort2() throws Exception { bytes = serializer.deserialize(bis); assertThat(new String(bytes)).isEqualTo(testFFFF); testFFFF += "x"; - try { - serializer.serialize(testFFFF.getBytes(), bos); - fail("Expected overflow"); - } - catch (IllegalArgumentException e) { } + assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(testFFFF.getBytes(), bos)); } @Test - public void testBad() throws Exception { - try { - new ByteArrayLengthHeaderSerializer(23); - fail("Expected illegal argument exception"); - } - catch (IllegalArgumentException e) { } + public void testBad() { + assertThatIllegalArgumentException().isThrownBy(() -> new ByteArrayLengthHeaderSerializer(23)); + } + + @Test + public void testIntInclusive() throws IOException { + ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer().inclusive(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + serializer.serialize(TEST.getBytes(), bos); + byte[] bytes = bos.toByteArray(); + assertThat(bytes[0]).isEqualTo((byte) 0); + assertThat(bytes[1]).isEqualTo((byte) 0); + assertThat(bytes[2]).isEqualTo((byte) 0); + assertThat(bytes[3]).isEqualTo((byte) (TEST.length() + 4)); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + bytes = serializer.deserialize(bis); + assertThat(new String(bytes)).isEqualTo(TEST); + } + + @Test + public void testByteInclusive() throws IOException { + ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer( + ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_BYTE).inclusive(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + serializer.serialize(TEST.getBytes(), bos); + byte[] bytes = bos.toByteArray(); + assertThat(bytes[0] & 0xff).isEqualTo((byte) (TEST.length() + 1)); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + bytes = serializer.deserialize(bis); + assertThat(new String(bytes)).isEqualTo(TEST); + assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(test255.getBytes(), bos)); + } + + @Test + public void testShort1Inclusive() throws IOException { + ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer( + ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT).inclusive(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + serializer.serialize(TEST.getBytes(), bos); + byte[] bytes = bos.toByteArray(); + assertThat(bytes[0]).isEqualTo((byte) 0); + assertThat(bytes[1] & 0xff).isEqualTo(TEST.length() + 2); + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + bytes = serializer.deserialize(bis); + assertThat(new String(bytes)).isEqualTo(TEST); + } + @Test + public void testShort2Inclusive() { + AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer( + ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT).inclusive(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(testFFFF.getBytes(), bos)); } + } diff --git a/src/reference/asciidoc/ip.adoc b/src/reference/asciidoc/ip.adoc index a02a5145aa3..712b0e18f54 100644 --- a/src/reference/asciidoc/ip.adoc +++ b/src/reference/asciidoc/ip.adoc @@ -373,6 +373,7 @@ The following example shows a client connection factory that uses `java.net.Sock ---- ==== +[[tcp-codecs]] ==== Message Demarcation (Serializers and Deserializers) TCP is a streaming protocol. @@ -398,6 +399,8 @@ The default size of the length header is four bytes (an Integer), allowing for m However, the `length` header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes. If you need any other format for the header, you can subclass `ByteArrayLengthHeaderSerializer` and provide implementations for the `readHeader` and `writeHeader` methods. The absolute maximum data size is (2^31 - 1) bytes. +Starting with version 5.2, the header value can include the length of the header in addition to the payload. +Set the `inclusive` property to enable that mechanism (it must be set to the same for producers and consumers). The `ByteArrayRawSerializer`^*^, converts a byte array to a stream of bytes and adds no additional message demarcation data. With this serializer (and deserializer), the end of a message is indicated by the client closing the socket in an orderly fashion. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 1ed996667ec..22d9b701c67 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -10,4 +10,8 @@ If you are interested in more details, see the Issue Tracker tickets that were r [[x5.2-general]] === General Changes +[[5.2-tcp]] +==== TCP Changes +The length header used by the `ByteArrayLengthHeaderSerializer` can now include the length of the header in addition to the payload. +See <> for more information.