Skip to content

GH-2780: ByteArrayLengthHeaderSerializer.inclusive #2783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


/**
* Reads data in an InputStream to a byte[]; data must be preceded by
Expand Down Expand Up @@ -66,7 +63,7 @@ public class ByteArrayLengthHeaderSerializer extends AbstractByteArraySerializer

private final int headerSize;

private final Log logger = LogFactory.getLog(this.getClass());
private int headerAdjust;

/**
* Constructs the serializer using {@link #HEADER_SIZE_INT}
Expand All @@ -90,6 +87,42 @@ public ByteArrayLengthHeaderSerializer(int headerSize) {
this.headerSize = headerSize;
}

/**
* Return true if the lenght header value includes its own length.
* @return true if the length includes the header length.
* @since 5.2
*/
protected boolean isInclusive() {
return this.headerAdjust > 0;
}

/**
* Set to true to set the length header to include the length of the header in
* addition to the payload. Valid header sizes are {@link #HEADER_SIZE_INT} (default),
* {@link #HEADER_SIZE_UNSIGNED_BYTE} and {@link #HEADER_SIZE_UNSIGNED_SHORT} and 4, 1
* and 2 will be added to the payload length respectively.
* @param inclusive true to include the header length.
* @since 5.2
* @see #inclusive()
*/
public void setInclusive(boolean inclusive) {
this.headerAdjust = inclusive ? this.headerSize : 0;
}

/**
* Include the length of the header in addition to the payload. Valid header sizes are
* {@link #HEADER_SIZE_INT} (default), {@link #HEADER_SIZE_UNSIGNED_BYTE} and
* {@link #HEADER_SIZE_UNSIGNED_SHORT} and 4, 1 and 2 will be added to the payload
* length respectively. Fluent API form of {@link #setInclusive(boolean)}.
* @return the serializer.
* @since 5.2
* @see #setInclusive(boolean)
*/
public ByteArrayLengthHeaderSerializer inclusive() {
setInclusive(true);
return this;
}

/**
* Reads the header from the stream and then reads the provided length
* from the stream and returns the data in a byte[]. Throws an
Expand All @@ -102,7 +135,7 @@ public ByteArrayLengthHeaderSerializer(int headerSize) {
*/
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
int messageLength = this.readHeader(inputStream);
int messageLength = this.readHeader(inputStream) - this.headerAdjust;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Message length is " + messageLength);
}
Expand Down Expand Up @@ -135,7 +168,7 @@ public byte[] deserialize(InputStream inputStream) throws IOException {
*/
@Override
public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
this.writeHeader(outputStream, bytes.length);
this.writeHeader(outputStream, bytes.length + this.headerAdjust);
outputStream.write(bytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package org.springframework.integration.ip.tcp.serializer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;

import org.junit.Before;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void setup() {
}

@Test
public void testInt() throws Exception {
public void testInt() throws IOException {
AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
serializer.serialize(TEST.getBytes(), bos);
Expand All @@ -65,16 +66,12 @@ public void testInt() throws Exception {
bytes = serializer.deserialize(bis);
assertThat(new String(bytes)).isEqualTo(TEST);
bytes[0] = -1;
bis = new ByteArrayInputStream(bytes);
try {
bytes = serializer.deserialize(bis);
fail("Expected negative length");
}
catch (IllegalArgumentException e) { }
ByteArrayInputStream bisBad = new ByteArrayInputStream(bytes);
assertThatIllegalArgumentException().isThrownBy(() -> serializer.deserialize(bisBad));
}

@Test
public void testByte() throws Exception {
public void testByte() throws IOException {
AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer(
ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_BYTE);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Expand All @@ -85,15 +82,11 @@ public void testByte() throws Exception {
bytes = serializer.deserialize(bis);
assertThat(new String(bytes)).isEqualTo(test255);
test255 += "x";
try {
serializer.serialize(test255.getBytes(), bos);
fail("Expected overflow");
}
catch (IllegalArgumentException e) { }
assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(test255.getBytes(), bos));
}

@Test
public void testShort1() throws Exception {
public void testShort1() throws IOException {
AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer(
ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Expand All @@ -107,7 +100,7 @@ public void testShort1() throws Exception {
}

@Test
public void testShort2() throws Exception {
public void testShort2() throws IOException {
AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer(
ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT);
serializer.setMaxMessageSize(0x10000);
Expand All @@ -120,20 +113,63 @@ public void testShort2() throws Exception {
bytes = serializer.deserialize(bis);
assertThat(new String(bytes)).isEqualTo(testFFFF);
testFFFF += "x";
try {
serializer.serialize(testFFFF.getBytes(), bos);
fail("Expected overflow");
}
catch (IllegalArgumentException e) { }
assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(testFFFF.getBytes(), bos));
}

@Test
public void testBad() throws Exception {
try {
new ByteArrayLengthHeaderSerializer(23);
fail("Expected illegal argument exception");
}
catch (IllegalArgumentException e) { }
public void testBad() {
assertThatIllegalArgumentException().isThrownBy(() -> new ByteArrayLengthHeaderSerializer(23));
}

@Test
public void testIntInclusive() throws IOException {
ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer().inclusive();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
serializer.serialize(TEST.getBytes(), bos);
byte[] bytes = bos.toByteArray();
assertThat(bytes[0]).isEqualTo((byte) 0);
assertThat(bytes[1]).isEqualTo((byte) 0);
assertThat(bytes[2]).isEqualTo((byte) 0);
assertThat(bytes[3]).isEqualTo((byte) (TEST.length() + 4));
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
bytes = serializer.deserialize(bis);
assertThat(new String(bytes)).isEqualTo(TEST);
}

@Test
public void testByteInclusive() throws IOException {
ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer(
ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_BYTE).inclusive();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
serializer.serialize(TEST.getBytes(), bos);
byte[] bytes = bos.toByteArray();
assertThat(bytes[0] & 0xff).isEqualTo((byte) (TEST.length() + 1));
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
bytes = serializer.deserialize(bis);
assertThat(new String(bytes)).isEqualTo(TEST);
assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(test255.getBytes(), bos));
}

@Test
public void testShort1Inclusive() throws IOException {
ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer(
ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT).inclusive();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
serializer.serialize(TEST.getBytes(), bos);
byte[] bytes = bos.toByteArray();
assertThat(bytes[0]).isEqualTo((byte) 0);
assertThat(bytes[1] & 0xff).isEqualTo(TEST.length() + 2);
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
bytes = serializer.deserialize(bis);
assertThat(new String(bytes)).isEqualTo(TEST);
}

@Test
public void testShort2Inclusive() {
AbstractByteArraySerializer serializer = new ByteArrayLengthHeaderSerializer(
ByteArrayLengthHeaderSerializer.HEADER_SIZE_UNSIGNED_SHORT).inclusive();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
assertThatIllegalArgumentException().isThrownBy(() -> serializer.serialize(testFFFF.getBytes(), bos));
}

}
3 changes: 3 additions & 0 deletions src/reference/asciidoc/ip.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ The following example shows a client connection factory that uses `java.net.Sock
----
====

[[tcp-codecs]]
==== Message Demarcation (Serializers and Deserializers)

TCP is a streaming protocol.
Expand All @@ -398,6 +399,8 @@ The default size of the length header is four bytes (an Integer), allowing for m
However, the `length` header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes.
If you need any other format for the header, you can subclass `ByteArrayLengthHeaderSerializer` and provide implementations for the `readHeader` and `writeHeader` methods.
The absolute maximum data size is (2^31 - 1) bytes.
Starting with version 5.2, the header value can include the length of the header in addition to the payload.
Set the `inclusive` property to enable that mechanism (it must be set to the same for producers and consumers).

The `ByteArrayRawSerializer`^*^, converts a byte array to a stream of bytes and adds no additional message demarcation data.
With this serializer (and deserializer), the end of a message is indicated by the client closing the socket in an orderly fashion.
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ If you are interested in more details, see the Issue Tracker tickets that were r
[[x5.2-general]]
=== General Changes

[[5.2-tcp]]
==== TCP Changes

The length header used by the `ByteArrayLengthHeaderSerializer` can now include the length of the header in addition to the payload.
See <<tcp-codecs>> for more information.