diff --git a/spring-core/src/test/java/org/springframework/core/io/ResourceTests.java b/spring-core/src/test/java/org/springframework/core/io/ResourceTests.java index 9385a210f99a..1d66bb629388 100644 --- a/spring-core/src/test/java/org/springframework/core/io/ResourceTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/ResourceTests.java @@ -44,7 +44,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -68,82 +68,88 @@ */ class ResourceTests { - @ParameterizedTest - @MethodSource("resource") - void resourceIsValid(Resource resource) throws Exception { - assertThat(resource.getFilename()).isEqualTo("ResourceTests.class"); - assertThat(resource.getURL().getFile()).endsWith("ResourceTests.class"); - assertThat(resource.exists()).isTrue(); - assertThat(resource.isReadable()).isTrue(); - assertThat(resource.contentLength()).isGreaterThan(0); - assertThat(resource.lastModified()).isGreaterThan(0); - assertThat(resource.getContentAsByteArray()).containsExactly(Files.readAllBytes(Path.of(resource.getURI()))); - } + @Nested + @ParameterizedClass + @MethodSource("resources") + class ParameterizedResourceTests { - @ParameterizedTest - @MethodSource("resource") - void resourceCreateRelative(Resource resource) throws Exception { - Resource relative1 = resource.createRelative("ClassPathResourceTests.class"); - assertThat(relative1.getFilename()).isEqualTo("ClassPathResourceTests.class"); - assertThat(relative1.getURL().getFile().endsWith("ClassPathResourceTests.class")).isTrue(); - assertThat(relative1.exists()).isTrue(); - assertThat(relative1.isReadable()).isTrue(); - assertThat(relative1.contentLength()).isGreaterThan(0); - assertThat(relative1.lastModified()).isGreaterThan(0); - } + private final Resource resource; - @ParameterizedTest - @MethodSource("resource") - void resourceCreateRelativeWithFolder(Resource resource) throws Exception { - Resource relative2 = resource.createRelative("support/PathMatchingResourcePatternResolverTests.class"); - assertThat(relative2.getFilename()).isEqualTo("PathMatchingResourcePatternResolverTests.class"); - assertThat(relative2.getURL().getFile()).endsWith("PathMatchingResourcePatternResolverTests.class"); - assertThat(relative2.exists()).isTrue(); - assertThat(relative2.isReadable()).isTrue(); - assertThat(relative2.contentLength()).isGreaterThan(0); - assertThat(relative2.lastModified()).isGreaterThan(0); - } + public ParameterizedResourceTests(Resource resource) { + this.resource = resource; + } - @ParameterizedTest - @MethodSource("resource") - void resourceCreateRelativeWithDotPath(Resource resource) throws Exception { - Resource relative3 = resource.createRelative("../CollectionFactoryTests.class"); - assertThat(relative3.getFilename()).isEqualTo("CollectionFactoryTests.class"); - assertThat(relative3.getURL().getFile()).endsWith("CollectionFactoryTests.class"); - assertThat(relative3.exists()).isTrue(); - assertThat(relative3.isReadable()).isTrue(); - assertThat(relative3.contentLength()).isGreaterThan(0); - assertThat(relative3.lastModified()).isGreaterThan(0); - } + @Test + void resourceIsValid() throws Exception { + assertThat(resource.getFilename()).isEqualTo("ResourceTests.class"); + assertThat(resource.getURL().getFile()).endsWith("ResourceTests.class"); + assertThat(resource.exists()).isTrue(); + assertThat(resource.isReadable()).isTrue(); + assertThat(resource.contentLength()).isGreaterThan(0); + assertThat(resource.lastModified()).isGreaterThan(0); + assertThat(resource.getContentAsByteArray()).containsExactly(Files.readAllBytes(Path.of(resource.getURI()))); + } - @ParameterizedTest - @MethodSource("resource") - void resourceCreateRelativeUnknown(Resource resource) throws Exception { - Resource relative4 = resource.createRelative("X.class"); - assertThat(relative4.exists()).isFalse(); - assertThat(relative4.isReadable()).isFalse(); - assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::contentLength); - assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::lastModified); - assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::getInputStream); - assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::readableChannel); - assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::getContentAsByteArray); - assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(() -> relative4.getContentAsString(UTF_8)); - } + @Test + void resourceCreateRelative() throws Exception { + Resource relative1 = resource.createRelative("ClassPathResourceTests.class"); + assertThat(relative1.getFilename()).isEqualTo("ClassPathResourceTests.class"); + assertThat(relative1.getURL().getFile().endsWith("ClassPathResourceTests.class")).isTrue(); + assertThat(relative1.exists()).isTrue(); + assertThat(relative1.isReadable()).isTrue(); + assertThat(relative1.contentLength()).isGreaterThan(0); + assertThat(relative1.lastModified()).isGreaterThan(0); + } - private static Stream resource() throws URISyntaxException { - URL resourceClass = ResourceTests.class.getResource("ResourceTests.class"); - Path resourceClassFilePath = Paths.get(resourceClass.toURI()); - return Stream.of( - argumentSet("ClassPathResource", new ClassPathResource("org/springframework/core/io/ResourceTests.class")), - argumentSet("ClassPathResource with ClassLoader", new ClassPathResource("org/springframework/core/io/ResourceTests.class", ResourceTests.class.getClassLoader())), - argumentSet("ClassPathResource with Class", new ClassPathResource("ResourceTests.class", ResourceTests.class)), - argumentSet("FileSystemResource", new FileSystemResource(resourceClass.getFile())), - argumentSet("FileSystemResource with File", new FileSystemResource(new File(resourceClass.getFile()))), - argumentSet("FileSystemResource with File path", new FileSystemResource(resourceClassFilePath)), - argumentSet("UrlResource", new UrlResource(resourceClass)) - ); - } + @Test + void resourceCreateRelativeWithFolder() throws Exception { + Resource relative2 = resource.createRelative("support/PathMatchingResourcePatternResolverTests.class"); + assertThat(relative2.getFilename()).isEqualTo("PathMatchingResourcePatternResolverTests.class"); + assertThat(relative2.getURL().getFile()).endsWith("PathMatchingResourcePatternResolverTests.class"); + assertThat(relative2.exists()).isTrue(); + assertThat(relative2.isReadable()).isTrue(); + assertThat(relative2.contentLength()).isGreaterThan(0); + assertThat(relative2.lastModified()).isGreaterThan(0); + } + @Test + void resourceCreateRelativeWithDotPath() throws Exception { + Resource relative3 = resource.createRelative("../CollectionFactoryTests.class"); + assertThat(relative3.getFilename()).isEqualTo("CollectionFactoryTests.class"); + assertThat(relative3.getURL().getFile()).endsWith("CollectionFactoryTests.class"); + assertThat(relative3.exists()).isTrue(); + assertThat(relative3.isReadable()).isTrue(); + assertThat(relative3.contentLength()).isGreaterThan(0); + assertThat(relative3.lastModified()).isGreaterThan(0); + } + + @Test + void resourceCreateRelativeUnknown() throws Exception { + Resource relative4 = resource.createRelative("X.class"); + assertThat(relative4.exists()).isFalse(); + assertThat(relative4.isReadable()).isFalse(); + assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::contentLength); + assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::lastModified); + assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::getInputStream); + assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::readableChannel); + assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(relative4::getContentAsByteArray); + assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(() -> relative4.getContentAsString(UTF_8)); + } + + private static Stream resources() throws URISyntaxException { + URL resourceClass = ResourceTests.class.getResource("ResourceTests.class"); + Path resourceClassFilePath = Paths.get(resourceClass.toURI()); + return Stream.of( + argumentSet("ClassPathResource", new ClassPathResource("org/springframework/core/io/ResourceTests.class")), + argumentSet("ClassPathResource with ClassLoader", new ClassPathResource("org/springframework/core/io/ResourceTests.class", ResourceTests.class.getClassLoader())), + argumentSet("ClassPathResource with Class", new ClassPathResource("ResourceTests.class", ResourceTests.class)), + argumentSet("FileSystemResource", new FileSystemResource(resourceClass.getFile())), + argumentSet("FileSystemResource with File", new FileSystemResource(new File(resourceClass.getFile()))), + argumentSet("FileSystemResource with File path", new FileSystemResource(resourceClassFilePath)), + argumentSet("UrlResource", new UrlResource(resourceClass)) + ); + } + } @Nested class ByteArrayResourceTests { diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index 137cfc4c804d..cfc4f216e934 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -25,6 +25,8 @@ import java.util.Arrays; import java.util.List; +import org.junit.jupiter.api.Test; + import org.springframework.core.testfixture.io.buffer.AbstractDataBufferAllocatingTests; import static org.assertj.core.api.Assertions.assertThat; @@ -38,10 +40,8 @@ */ class DataBufferTests extends AbstractDataBufferAllocatingTests { - @ParameterizedDataBufferAllocatingTest - void byteCountsAndPositions(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void byteCountsAndPositions() { DataBuffer buffer = createDataBuffer(2); assertThat(buffer.readPosition()).isEqualTo(0); @@ -81,10 +81,8 @@ void byteCountsAndPositions(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void readPositionSmallerThanZero(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void readPositionSmallerThanZero() { DataBuffer buffer = createDataBuffer(1); try { assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(() -> buffer.readPosition(-1)); @@ -94,10 +92,8 @@ void readPositionSmallerThanZero(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void readPositionGreaterThanWritePosition(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void readPositionGreaterThanWritePosition() { DataBuffer buffer = createDataBuffer(1); try { assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(() -> buffer.readPosition(1)); @@ -107,10 +103,8 @@ void readPositionGreaterThanWritePosition(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void writePositionSmallerThanReadPosition(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writePositionSmallerThanReadPosition() { DataBuffer buffer = createDataBuffer(2); try { buffer.write((byte) 'a'); @@ -122,10 +116,8 @@ void writePositionSmallerThanReadPosition(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void writePositionGreaterThanCapacity(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writePositionGreaterThanCapacity() { DataBuffer buffer = createDataBuffer(1); try { assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy(() -> buffer.writePosition(2)); @@ -135,10 +127,8 @@ void writePositionGreaterThanCapacity(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void writeAndRead(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeAndRead() { DataBuffer buffer = createDataBuffer(5); buffer.write(new byte[]{'a', 'b', 'c'}); @@ -156,10 +146,8 @@ void writeAndRead(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void writeNullString(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeNullString() { DataBuffer buffer = createDataBuffer(1); try { assertThatIllegalArgumentException().isThrownBy(() -> @@ -170,10 +158,8 @@ void writeNullString(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void writeNullCharset(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeNullCharset() { DataBuffer buffer = createDataBuffer(1); try { assertThatIllegalArgumentException().isThrownBy(() -> @@ -184,10 +170,8 @@ void writeNullCharset(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void writeEmptyString(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeEmptyString() { DataBuffer buffer = createDataBuffer(1); buffer.write("", StandardCharsets.UTF_8); @@ -196,10 +180,8 @@ void writeEmptyString(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void writeUtf8String(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeUtf8String() { DataBuffer buffer = createDataBuffer(6); buffer.write("Spring", StandardCharsets.UTF_8); @@ -210,10 +192,8 @@ void writeUtf8String(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void writeUtf8StringOutGrowsCapacity(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeUtf8StringOutGrowsCapacity() { DataBuffer buffer = createDataBuffer(5); buffer.write("Spring €", StandardCharsets.UTF_8); @@ -224,10 +204,8 @@ void writeUtf8StringOutGrowsCapacity(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void writeIsoString(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeIsoString() { DataBuffer buffer = createDataBuffer(3); buffer.write("\u00A3", StandardCharsets.ISO_8859_1); @@ -238,10 +216,8 @@ void writeIsoString(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void writeMultipleUtf8String(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeMultipleUtf8String() { DataBuffer buffer = createDataBuffer(1); buffer.write("abc", StandardCharsets.UTF_8); assertThat(buffer.readableByteCount()).isEqualTo(3); @@ -260,10 +236,8 @@ void writeMultipleUtf8String(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void toStringNullCharset(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void toStringNullCharset() { DataBuffer buffer = createDataBuffer(1); try { assertThatIllegalArgumentException().isThrownBy(() -> @@ -274,10 +248,8 @@ void toStringNullCharset(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void toStringUtf8(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void toStringUtf8() { String spring = "Spring"; byte[] bytes = spring.getBytes(StandardCharsets.UTF_8); DataBuffer buffer = createDataBuffer(bytes.length); @@ -289,10 +261,8 @@ void toStringUtf8(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void toStringSection(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void toStringSection() { String spring = "Spring"; byte[] bytes = spring.getBytes(StandardCharsets.UTF_8); DataBuffer buffer = createDataBuffer(bytes.length); @@ -304,10 +274,8 @@ void toStringSection(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void inputStream(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - + @Test + void inputStream() throws Exception { DataBuffer buffer = createDataBuffer(4); buffer.write(new byte[]{'a', 'b', 'c', 'd', 'e'}); buffer.readPosition(1); @@ -389,10 +357,8 @@ void inputStream(DataBufferFactory bufferFactory) throws Exception { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void inputStreamReleaseOnClose(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - + @Test + void inputStreamReleaseOnClose() throws Exception { DataBuffer buffer = createDataBuffer(3); byte[] bytes = {'a', 'b', 'c'}; buffer.write(bytes); @@ -407,10 +373,8 @@ void inputStreamReleaseOnClose(DataBufferFactory bufferFactory) throws Exception // AbstractDataBufferAllocatingTests.leakDetector will verify the buffer's release } - @ParameterizedDataBufferAllocatingTest - void outputStream(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - + @Test + void outputStream() throws Exception { DataBuffer buffer = createDataBuffer(4); buffer.write((byte) 'a'); @@ -427,10 +391,8 @@ void outputStream(DataBufferFactory bufferFactory) throws Exception { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void expand(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void expand() { DataBuffer buffer = createDataBuffer(1); buffer.write((byte) 'a'); assertThat(buffer.capacity()).isEqualTo(1); @@ -441,11 +403,9 @@ void expand(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void increaseCapacity(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void increaseCapacity() { DataBuffer buffer = createDataBuffer(1); assertThat(buffer.capacity()).isEqualTo(1); @@ -455,11 +415,9 @@ void increaseCapacity(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void decreaseCapacityLowReadPosition(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void decreaseCapacityLowReadPosition() { DataBuffer buffer = createDataBuffer(2); buffer.writePosition(2); buffer.capacity(1); @@ -468,11 +426,9 @@ void decreaseCapacityLowReadPosition(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void decreaseCapacityHighReadPosition(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void decreaseCapacityHighReadPosition() { DataBuffer buffer = createDataBuffer(2); buffer.writePosition(2); buffer.readPosition(2); @@ -482,11 +438,9 @@ void decreaseCapacityHighReadPosition(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void capacityLessThanZero(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void capacityLessThanZero() { DataBuffer buffer = createDataBuffer(1); try { assertThatIllegalArgumentException().isThrownBy(() -> buffer.capacity(-1)); @@ -496,10 +450,8 @@ void capacityLessThanZero(DataBufferFactory bufferFactory) { } } - @ParameterizedDataBufferAllocatingTest - void writeByteBuffer(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeByteBuffer() { DataBuffer buffer1 = createDataBuffer(1); buffer1.write((byte) 'a'); ByteBuffer buffer2 = createByteBuffer(2); @@ -525,10 +477,8 @@ private ByteBuffer createByteBuffer(int capacity) { return ByteBuffer.allocate(capacity); } - @ParameterizedDataBufferAllocatingTest - void writeDataBuffer(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeDataBuffer() { DataBuffer buffer1 = createDataBuffer(1); buffer1.write((byte) 'a'); DataBuffer buffer2 = createDataBuffer(2); @@ -548,11 +498,9 @@ void writeDataBuffer(DataBufferFactory bufferFactory) { release(buffer1, buffer2, buffer3); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void asByteBuffer(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void asByteBuffer() { DataBuffer buffer = createDataBuffer(4); buffer.write(new byte[]{'a', 'b', 'c'}); buffer.read(); // skip a @@ -570,10 +518,8 @@ void asByteBuffer(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void asByteBufferIndexLength(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void asByteBufferIndexLength() { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b'}); @@ -591,11 +537,9 @@ void asByteBufferIndexLength(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void byteBufferContainsDataBufferChanges(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void byteBufferContainsDataBufferChanges() { DataBuffer dataBuffer = createDataBuffer(1); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, 1); @@ -608,11 +552,9 @@ void byteBufferContainsDataBufferChanges(DataBufferFactory bufferFactory) { release(dataBuffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void dataBufferContainsByteBufferChanges(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void dataBufferContainsByteBufferChanges() { DataBuffer dataBuffer = createDataBuffer(1); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, 1); @@ -625,11 +567,9 @@ void dataBufferContainsByteBufferChanges(DataBufferFactory bufferFactory) { release(dataBuffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void emptyAsByteBuffer(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void emptyAsByteBuffer() { DataBuffer buffer = createDataBuffer(1); ByteBuffer result = buffer.asByteBuffer(); @@ -639,10 +579,8 @@ void emptyAsByteBuffer(DataBufferFactory bufferFactory) { } - @ParameterizedDataBufferAllocatingTest - void toByteBuffer(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void toByteBuffer() { DataBuffer buffer = createDataBuffer(4); buffer.write(new byte[]{'a', 'b', 'c'}); buffer.read(); // skip a @@ -659,10 +597,8 @@ void toByteBuffer(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void toByteBufferIndexLength(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void toByteBufferIndexLength() { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b', 'c'}); @@ -678,10 +614,8 @@ void toByteBufferIndexLength(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void toByteBufferDestination(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void toByteBufferDestination() { DataBuffer buffer = createDataBuffer(4); buffer.write(new byte[]{'a', 'b', 'c'}); @@ -700,10 +634,8 @@ void toByteBufferDestination(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void readableByteBuffers(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void readableByteBuffers() { DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(3); dataBuffer.write("abc".getBytes(StandardCharsets.UTF_8)); dataBuffer.readPosition(1); @@ -732,10 +664,8 @@ void readableByteBuffers(DataBufferFactory bufferFactory) { release(dataBuffer); } - @ParameterizedDataBufferAllocatingTest - void readableByteBuffersJoined(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void readableByteBuffersJoined() { DataBuffer dataBuffer = this.bufferFactory.join(Arrays.asList(stringBuffer("a"), stringBuffer("b"), stringBuffer("c"))); @@ -757,10 +687,8 @@ void readableByteBuffersJoined(DataBufferFactory bufferFactory) { release(dataBuffer); } - @ParameterizedDataBufferAllocatingTest - void writableByteBuffers(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writableByteBuffers() { DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(3); dataBuffer.write("ab".getBytes(StandardCharsets.UTF_8)); dataBuffer.readPosition(1); @@ -785,10 +713,8 @@ void writableByteBuffers(DataBufferFactory bufferFactory) { release(dataBuffer); } - @ParameterizedDataBufferAllocatingTest - void indexOf(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void indexOf() { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b', 'c'}); @@ -807,10 +733,8 @@ void indexOf(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void lastIndexOf(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void lastIndexOf() { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b', 'c'}); @@ -838,11 +762,9 @@ void lastIndexOf(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void slice(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void slice() { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b'}); @@ -864,11 +786,9 @@ void slice(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void retainedSlice(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void retainedSlice() { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b'}); @@ -891,11 +811,9 @@ void retainedSlice(DataBufferFactory bufferFactory) { release(buffer, slice); } - @ParameterizedDataBufferAllocatingTest + @Test @SuppressWarnings("deprecation") - void spr16351(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + void spr16351() { DataBuffer buffer = createDataBuffer(6); byte[] bytes = {'a', 'b', 'c', 'd', 'e', 'f'}; buffer.write(bytes); @@ -913,10 +831,8 @@ void spr16351(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void split(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void split() { DataBuffer buffer = createDataBuffer(3); buffer.write(new byte[]{'a', 'b'}); @@ -964,10 +880,8 @@ void split(DataBufferFactory bufferFactory) { release(buffer, buffer2, split, split2); } - @ParameterizedDataBufferAllocatingTest - void join(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void join() { DataBuffer composite = this.bufferFactory.join(Arrays.asList(stringBuffer("a"), stringBuffer("b"), stringBuffer("c"))); assertThat(composite.readableByteCount()).isEqualTo(3); @@ -979,10 +893,8 @@ void join(DataBufferFactory bufferFactory) { release(composite); } - @ParameterizedDataBufferAllocatingTest - void getByte(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void getByte() { DataBuffer buffer = stringBuffer("abc"); assertThat(buffer.getByte(0)).isEqualTo((byte) 'a'); @@ -994,8 +906,8 @@ void getByte(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest // gh-31605 - void shouldHonorSourceBuffersReadPosition(DataBufferFactory bufferFactory) { + @Test // gh-31605 + void shouldHonorSourceBuffersReadPosition() { DataBuffer dataBuffer = bufferFactory.wrap("ab".getBytes(StandardCharsets.UTF_8)); dataBuffer.readPosition(1); @@ -1005,10 +917,8 @@ void shouldHonorSourceBuffersReadPosition(DataBufferFactory bufferFactory) { assertThat(StandardCharsets.UTF_8.decode(byteBuffer).toString()).isEqualTo("b"); } - @ParameterizedDataBufferAllocatingTest // gh-31873 - void repeatedWrites(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test // gh-31873 + void repeatedWrites() { DataBuffer buffer = bufferFactory.allocateBuffer(256); String name = "Müller"; int repeatCount = 19; @@ -1022,10 +932,8 @@ void repeatedWrites(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void forEachByteProcessAll(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void forEachByteProcessAll() { List result = new ArrayList<>(); DataBuffer buffer = byteBuffer(new byte[]{'a', 'b', 'c', 'd'}); int index = buffer.forEachByte(0, 4, b -> { @@ -1038,10 +946,8 @@ void forEachByteProcessAll(DataBufferFactory bufferFactory) { } - @ParameterizedDataBufferAllocatingTest - void forEachByteProcessSome(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void forEachByteProcessSome() { List result = new ArrayList<>(); DataBuffer buffer = byteBuffer(new byte[]{'a', 'b', 'c', 'd'}); int index = buffer.forEachByte(0, 4, b -> { diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 00d95af03a18..9d061ce3242a 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -42,6 +42,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.mockito.stubbing.Answer; import org.reactivestreams.Publisher; @@ -70,7 +71,7 @@ * @author Arjen Poutsma * @author Sam Brannen */ -class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { +class DataBufferUtilsTests { private final Resource resource; @@ -82,1210 +83,1133 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { this.tempFile = Files.createTempFile("DataBufferUtilsTests", null); } + @Nested + class ParameterizedDataBufferUtilsTests extends AbstractDataBufferAllocatingTests { - @ParameterizedDataBufferAllocatingTest - void readInputStream(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - Flux flux = DataBufferUtils.readInputStream( - this.resource::getInputStream, super.bufferFactory, 3); - - verifyReadData(flux); - } - - @ParameterizedDataBufferAllocatingTest - void readByteChannel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - URI uri = this.resource.getURI(); - Flux result = - DataBufferUtils.readByteChannel(() -> FileChannel.open(Paths.get(uri), StandardOpenOption.READ), - super.bufferFactory, 3); - - verifyReadData(result); - } - - @ParameterizedDataBufferAllocatingTest - void readByteChannelError(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - ReadableByteChannel channel = mock(); - given(channel.read(any())) - .willAnswer(invocation -> { - ByteBuffer buffer = invocation.getArgument(0); - buffer.put("foo".getBytes(StandardCharsets.UTF_8)); - buffer.flip(); - return 3; - }) - .willThrow(new IOException()); - - Flux result = - DataBufferUtils.readByteChannel(() -> channel, super.bufferFactory, 3); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .expectError(IOException.class) - .verify(Duration.ofSeconds(3)); - } - - @ParameterizedDataBufferAllocatingTest - void readByteChannelCancel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - URI uri = this.resource.getURI(); - Flux result = - DataBufferUtils.readByteChannel(() -> FileChannel.open(Paths.get(uri), StandardOpenOption.READ), - super.bufferFactory, 3); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .thenCancel() - .verify(); - } - - @ParameterizedDataBufferAllocatingTest - void readAsynchronousFileChannel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - URI uri = this.resource.getURI(); - Flux flux = DataBufferUtils.readAsynchronousFileChannel( - () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), - super.bufferFactory, 3); - - verifyReadData(flux); - } - - @ParameterizedDataBufferAllocatingTest - void readAsynchronousFileChannelPosition(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - URI uri = this.resource.getURI(); - Flux flux = DataBufferUtils.readAsynchronousFileChannel( - () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), - 9, super.bufferFactory, 3); - - StepVerifier.create(flux) - .consumeNextWith(stringConsumer("qux")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + @Test + void readInputStream() { + Flux flux = DataBufferUtils.readInputStream(resource::getInputStream, bufferFactory, 3); + verifyReadData(flux); + } - @ParameterizedDataBufferAllocatingTest - void readAsynchronousFileChannelError(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - AsynchronousFileChannel channel = mock(); - willAnswer(invocation -> { - ByteBuffer byteBuffer = invocation.getArgument(0); - byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8)); - long pos = invocation.getArgument(1); - assertThat(pos).isEqualTo(0); - Object attachment = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.completed(3, attachment); - return null; - }).willAnswer(invocation -> { - Object attachment = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.failed(new IOException(), attachment); - return null; - }) - .given(channel).read(any(), anyLong(), any(), any()); - - Flux result= - DataBufferUtils.readAsynchronousFileChannel(() -> channel, super.bufferFactory, 3); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .expectError(IOException.class) - .verify(Duration.ofSeconds(3)); - } + @Test + void readByteChannel() throws Exception { + URI uri = resource.getURI(); + Flux result = DataBufferUtils.readByteChannel(() -> FileChannel.open(Paths.get(uri), StandardOpenOption.READ), bufferFactory, 3); + verifyReadData(result); + } - @ParameterizedDataBufferAllocatingTest - void readAsynchronousFileChannelCancel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + @Test + void readByteChannelError() throws Exception { + ReadableByteChannel channel = mock(); + given(channel.read(any())) + .willAnswer(invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + buffer.put("foo".getBytes(StandardCharsets.UTF_8)); + buffer.flip(); + return 3; + }) + .willThrow(new IOException()); - URI uri = this.resource.getURI(); - Flux flux = DataBufferUtils.readAsynchronousFileChannel( - () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), - super.bufferFactory, 3); + Flux result = + DataBufferUtils.readByteChannel(() -> channel, bufferFactory, 3); - StepVerifier.create(flux) - .consumeNextWith(stringConsumer("foo")) - .thenCancel() - .verify(); - } + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .expectError(IOException.class) + .verify(Duration.ofSeconds(3)); + } - @ParameterizedDataBufferAllocatingTest // gh-22107 - void readAsynchronousFileChannelCancelWithoutDemand(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + @Test + void readByteChannelCancel() throws Exception { + URI uri = resource.getURI(); + Flux result = + DataBufferUtils.readByteChannel(() -> FileChannel.open(Paths.get(uri), StandardOpenOption.READ), + bufferFactory, 3); - URI uri = this.resource.getURI(); - Flux flux = DataBufferUtils.readAsynchronousFileChannel( - () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), - super.bufferFactory, 3); + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .thenCancel() + .verify(); + } - BaseSubscriber subscriber = new ZeroDemandSubscriber(); - flux.subscribe(subscriber); - subscriber.cancel(); - } + @Test + void readAsynchronousFileChannel() throws Exception { + URI uri = resource.getURI(); + Flux flux = DataBufferUtils.readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), + bufferFactory, 3); - @ParameterizedDataBufferAllocatingTest - void readPath(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + verifyReadData(flux); + } - Flux flux = DataBufferUtils.read(this.resource.getFile().toPath(), super.bufferFactory, 3); + @Test + void readAsynchronousFileChannelPosition() throws Exception { + URI uri = resource.getURI(); + Flux flux = DataBufferUtils.readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), + 9, bufferFactory, 3); + + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - verifyReadData(flux); - } + @Test + void readAsynchronousFileChannelError() { + AsynchronousFileChannel channel = mock(); + willAnswer(invocation -> { + ByteBuffer byteBuffer = invocation.getArgument(0); + byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8)); + long pos = invocation.getArgument(1); + assertThat(pos).isEqualTo(0); + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.completed(3, attachment); + return null; + }).willAnswer(invocation -> { + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.failed(new IOException(), attachment); + return null; + }) + .given(channel).read(any(), anyLong(), any(), any()); - @ParameterizedDataBufferAllocatingTest - void readResource(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + Flux result = + DataBufferUtils.readAsynchronousFileChannel(() -> channel, bufferFactory, 3); - Flux flux = DataBufferUtils.read(this.resource, super.bufferFactory, 3); + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .expectError(IOException.class) + .verify(Duration.ofSeconds(3)); + } - verifyReadData(flux); - } + @Test + void readAsynchronousFileChannelCancel() throws Exception { + URI uri = resource.getURI(); + Flux flux = DataBufferUtils.readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), + bufferFactory, 3); + + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("foo")) + .thenCancel() + .verify(); + } - @ParameterizedDataBufferAllocatingTest - void readResourcePosition(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test // gh-22107 + void readAsynchronousFileChannelCancelWithoutDemand() throws Exception { + URI uri = resource.getURI(); + Flux flux = DataBufferUtils.readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), + bufferFactory, 3); - Flux flux = DataBufferUtils.read(this.resource, 9, super.bufferFactory, 3); + BaseSubscriber subscriber = new ZeroDemandSubscriber(); + flux.subscribe(subscriber); + subscriber.cancel(); + } - StepVerifier.create(flux) - .consumeNextWith(stringConsumer("qux")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + @Test + void readPath() throws Exception { + Flux flux = DataBufferUtils.read(resource.getFile().toPath(), bufferFactory, 3); - private void verifyReadData(Flux buffers) { - StepVerifier.create(buffers) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("baz")) - .consumeNextWith(stringConsumer("qux")) - .expectComplete() - .verify(Duration.ofSeconds(3)); - } + verifyReadData(flux); + } - @ParameterizedDataBufferAllocatingTest - void readResourcePositionAndTakeUntil(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void readResource() { + Flux flux = DataBufferUtils.read(resource, bufferFactory, 3); - Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass()); - Flux flux = DataBufferUtils.read(resource, 3, super.bufferFactory, 3); + verifyReadData(flux); + } - flux = DataBufferUtils.takeUntilByteCount(flux, 5); + @Test + void readResourcePosition() { + Flux flux = DataBufferUtils.read(resource, 9, bufferFactory, 3); + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - StepVerifier.create(flux) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("ba")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + private void verifyReadData(Flux buffers) { + StepVerifier.create(buffers) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(Duration.ofSeconds(3)); + } - @ParameterizedDataBufferAllocatingTest - void readByteArrayResourcePositionAndTakeUntil(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void readResourcePositionAndTakeUntil() { + Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass()); + Flux flux = DataBufferUtils.read(resource, 3, bufferFactory, 3); - Resource resource = new ByteArrayResource("foobarbazqux" .getBytes()); - Flux flux = DataBufferUtils.read(resource, 3, super.bufferFactory, 3); + flux = DataBufferUtils.takeUntilByteCount(flux, 5); - flux = DataBufferUtils.takeUntilByteCount(flux, 5); + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("ba")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + @Test + void readByteArrayResourcePositionAndTakeUntil() { + Resource resource = new ByteArrayResource("foobarbazqux".getBytes()); + Flux flux = DataBufferUtils.read(resource, 3, bufferFactory, 3); - StepVerifier.create(flux) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("ba")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + flux = DataBufferUtils.takeUntilByteCount(flux, 5); - @ParameterizedDataBufferAllocatingTest - void writeOutputStream(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("ba")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - DataBuffer qux = stringBuffer("qux"); - Flux flux = Flux.just(foo, bar, baz, qux); + @Test + void writeOutputStream() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + DataBuffer qux = stringBuffer("qux"); + Flux flux = Flux.just(foo, bar, baz, qux); - OutputStream os = Files.newOutputStream(tempFile); + OutputStream os = Files.newOutputStream(tempFile); - Flux writeResult = DataBufferUtils.write(flux, os); - verifyWrittenData(writeResult); - os.close(); - } + Flux writeResult = DataBufferUtils.write(flux, os); + verifyWrittenData(writeResult); + os.close(); + } - @ParameterizedDataBufferAllocatingTest - void writeWritableByteChannel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + @Test + void writeWritableByteChannel() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + DataBuffer qux = stringBuffer("qux"); + Flux flux = Flux.just(foo, bar, baz, qux); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - DataBuffer qux = stringBuffer("qux"); - Flux flux = Flux.just(foo, bar, baz, qux); + WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); - WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); + Flux writeResult = DataBufferUtils.write(flux, channel); + verifyWrittenData(writeResult); + channel.close(); + } - Flux writeResult = DataBufferUtils.write(flux, channel); - verifyWrittenData(writeResult); - channel.close(); - } + @Test + void writeWritableByteChannelErrorInFlux() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException())); - @ParameterizedDataBufferAllocatingTest - void writeWritableByteChannelErrorInFlux(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException())); + Flux writeResult = DataBufferUtils.write(flux, channel); + StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .expectError() + .verify(Duration.ofSeconds(5)); - WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); + String result = String.join("", Files.readAllLines(tempFile)); - Flux writeResult = DataBufferUtils.write(flux, channel); - StepVerifier.create(writeResult) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .expectError() - .verify(Duration.ofSeconds(5)); + assertThat(result).isEqualTo("foobar"); + channel.close(); + } - String result = String.join("", Files.readAllLines(tempFile)); + @Test + void writeWritableByteChannelErrorInWrite() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar); + + WritableByteChannel channel = mock(); + given(channel.write(any())) + .willAnswer(invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + int written = buffer.remaining(); + buffer.position(buffer.limit()); + return written; + }) + .willThrow(new IOException()); - assertThat(result).isEqualTo("foobar"); - channel.close(); - } + Flux writeResult = DataBufferUtils.write(flux, channel); + StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .expectError(IOException.class) + .verify(Duration.ofSeconds(3)); - @ParameterizedDataBufferAllocatingTest - void writeWritableByteChannelErrorInWrite(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = Flux.just(foo, bar); - - WritableByteChannel channel = mock(); - given(channel.write(any())) - .willAnswer(invocation -> { - ByteBuffer buffer = invocation.getArgument(0); - int written = buffer.remaining(); - buffer.position(buffer.limit()); - return written; - }) - .willThrow(new IOException()); - - Flux writeResult = DataBufferUtils.write(flux, channel); - StepVerifier.create(writeResult) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .expectError(IOException.class) - .verify(Duration.ofSeconds(3)); - - channel.close(); - } + channel.close(); + } - @ParameterizedDataBufferAllocatingTest - void writeWritableByteChannelCancel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + @Test + void writeWritableByteChannelCancel() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = Flux.just(foo, bar); + WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); - WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE); + Flux writeResult = DataBufferUtils.write(flux, channel); + StepVerifier.create(writeResult, 1) + .consumeNextWith(stringConsumer("foo")) + .thenCancel() + .verify(Duration.ofSeconds(5)); - Flux writeResult = DataBufferUtils.write(flux, channel); - StepVerifier.create(writeResult, 1) - .consumeNextWith(stringConsumer("foo")) - .thenCancel() - .verify(Duration.ofSeconds(5)); + String result = String.join("", Files.readAllLines(tempFile)); - String result = String.join("", Files.readAllLines(tempFile)); + assertThat(result).isEqualTo("foo"); + channel.close(); - assertThat(result).isEqualTo("foo"); - channel.close(); + flux.subscribe(DataBufferUtils::release); + } - flux.subscribe(DataBufferUtils::release); - } + @Test + void writeAsynchronousFileChannel() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + DataBuffer qux = stringBuffer("qux"); + Flux flux = Flux.just(foo, bar, baz, qux); - @ParameterizedDataBufferAllocatingTest - void writeAsynchronousFileChannel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + AsynchronousFileChannel channel = + AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - DataBuffer qux = stringBuffer("qux"); - Flux flux = Flux.just(foo, bar, baz, qux); + Flux writeResult = DataBufferUtils.write(flux, channel); + verifyWrittenData(writeResult); + channel.close(); + } - AsynchronousFileChannel channel = - AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); + private void verifyWrittenData(Flux writeResult) throws IOException { + StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(Duration.ofSeconds(3)); - Flux writeResult = DataBufferUtils.write(flux, channel); - verifyWrittenData(writeResult); - channel.close(); - } + String result = String.join("", Files.readAllLines(tempFile)); - private void verifyWrittenData(Flux writeResult) throws IOException { - StepVerifier.create(writeResult) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("baz")) - .consumeNextWith(stringConsumer("qux")) - .expectComplete() - .verify(Duration.ofSeconds(3)); + assertThat(result).isEqualTo("foobarbazqux"); + } - String result = String.join("", Files.readAllLines(tempFile)); + @Test + void writeAsynchronousFileChannelErrorInFlux() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = + Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException())); - assertThat(result).isEqualTo("foobarbazqux"); - } + AsynchronousFileChannel channel = + AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); - @ParameterizedDataBufferAllocatingTest - void writeAsynchronousFileChannelErrorInFlux(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + Flux writeResult = DataBufferUtils.write(flux, channel); + StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .expectError(RuntimeException.class) + .verify(); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = - Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException())); + String result = String.join("", Files.readAllLines(tempFile)); - AsynchronousFileChannel channel = - AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); + assertThat(result).isEqualTo("foobar"); + channel.close(); + } - Flux writeResult = DataBufferUtils.write(flux, channel); - StepVerifier.create(writeResult) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .expectError(RuntimeException.class) - .verify(); + @Test + void writeAsynchronousFileChannelErrorInWrite() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar); + + AsynchronousFileChannel channel = mock(); + willAnswer(invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + long pos = invocation.getArgument(1); + assertThat(pos).isEqualTo(0); + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); + int written = buffer.remaining(); + buffer.position(buffer.limit()); + completionHandler.completed(written, attachment); + return null; + }) + .willAnswer(invocation -> { + Object attachment = invocation.getArgument(2); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.failed(new IOException(), attachment); + return null; + }) + .given(channel).write(any(), anyLong(), any(), any()); - String result = String.join("", Files.readAllLines(tempFile)); + Flux writeResult = DataBufferUtils.write(flux, channel); + StepVerifier.create(writeResult) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .expectError(IOException.class) + .verify(); - assertThat(result).isEqualTo("foobar"); - channel.close(); - } + channel.close(); + } - @ParameterizedDataBufferAllocatingTest - void writeAsynchronousFileChannelErrorInWrite(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = Flux.just(foo, bar); - - AsynchronousFileChannel channel = mock(); - willAnswer(invocation -> { - ByteBuffer buffer = invocation.getArgument(0); - long pos = invocation.getArgument(1); - assertThat(pos).isEqualTo(0); - Object attachment = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - int written = buffer.remaining(); - buffer.position(buffer.limit()); - completionHandler.completed(written, attachment); - return null; - }) - .willAnswer(invocation -> { - Object attachment = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.failed(new IOException(), attachment); - return null; - }) - .given(channel).write(any(), anyLong(), any(), any()); - - Flux writeResult = DataBufferUtils.write(flux, channel); - StepVerifier.create(writeResult) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .expectError(IOException.class) - .verify(); - - channel.close(); - } + @Test + void writeAsynchronousFileChannelCanceled() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar); - @ParameterizedDataBufferAllocatingTest - void writeAsynchronousFileChannelCanceled(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + AsynchronousFileChannel channel = + AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = Flux.just(foo, bar); + Flux writeResult = DataBufferUtils.write(flux, channel); + StepVerifier.create(writeResult, 1) + .consumeNextWith(stringConsumer("foo")) + .thenCancel() + .verify(); - AsynchronousFileChannel channel = - AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE); + String result = String.join("", Files.readAllLines(tempFile)); - Flux writeResult = DataBufferUtils.write(flux, channel); - StepVerifier.create(writeResult, 1) - .consumeNextWith(stringConsumer("foo")) - .thenCancel() - .verify(); + assertThat(result).isEqualTo("foo"); + channel.close(); - String result = String.join("", Files.readAllLines(tempFile)); + flux.subscribe(DataBufferUtils::release); + } - assertThat(result).isEqualTo("foo"); - channel.close(); + @Test + void writePath() throws Exception { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar); - flux.subscribe(DataBufferUtils::release); - } + Mono result = DataBufferUtils.write(flux, tempFile); - @ParameterizedDataBufferAllocatingTest - void writePath(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + StepVerifier.create(result) + .verifyComplete(); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = Flux.just(foo, bar); + List written = Files.readAllLines(tempFile); + assertThat(written).contains("foobar"); + } - Mono result = DataBufferUtils.write(flux, tempFile); + @Test + void outputStreamPublisher() { + byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); + byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); + byte[] baz = "baz".getBytes(StandardCharsets.UTF_8); + + Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { + try { + outputStream.write(foo); + outputStream.write(bar); + outputStream.write(baz); + } + catch (IOException ex) { + fail(ex.getMessage(), ex); + } + }, bufferFactory, Executors.newSingleThreadExecutor()); + + StepVerifier.create(publisher) + .consumeNextWith(stringConsumer("foobarbaz")) + .verifyComplete(); + } - StepVerifier.create(result) - .verifyComplete(); + @Test + void outputStreamPublisherFlush() { + byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); + byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); + byte[] baz = "baz".getBytes(StandardCharsets.UTF_8); + + Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { + try { + outputStream.write(foo); + outputStream.flush(); + outputStream.write(bar); + outputStream.flush(); + outputStream.write(baz); + outputStream.flush(); + } + catch (IOException ex) { + fail(ex.getMessage(), ex); + } + }, bufferFactory, Executors.newSingleThreadExecutor()); + + StepVerifier.create(publisher) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .verifyComplete(); + } - List written = Files.readAllLines(tempFile); - assertThat(written).contains("foobar"); - } + @Test + void outputStreamPublisherChunkSize() { + byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); + byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); + byte[] baz = "baz".getBytes(StandardCharsets.UTF_8); + + Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { + try { + outputStream.write(foo); + outputStream.write(bar); + outputStream.write(baz); + } + catch (IOException ex) { + fail(ex.getMessage(), ex); + } + }, bufferFactory, Executors.newSingleThreadExecutor(), 3); + + StepVerifier.create(publisher) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .verifyComplete(); + } - @ParameterizedDataBufferAllocatingTest - void outputStreamPublisher(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void outputStreamPublisherCancel() throws InterruptedException { + byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); + byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); - byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); - byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); - byte[] baz = "baz".getBytes(StandardCharsets.UTF_8); + CountDownLatch latch = new CountDownLatch(1); - Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { - try { - outputStream.write(foo); - outputStream.write(bar); - outputStream.write(baz); - } - catch (IOException ex) { - fail(ex.getMessage(), ex); - } - }, super.bufferFactory, Executors.newSingleThreadExecutor()); + Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { + try { + assertThatIOException() + .isThrownBy(() -> { + outputStream.write(foo); + outputStream.flush(); + outputStream.write(bar); + outputStream.flush(); + }) + .withMessage("Subscription has been terminated"); + } + finally { + latch.countDown(); + } + }, bufferFactory, Executors.newSingleThreadExecutor()); + + StepVerifier.create(publisher, 1) + .consumeNextWith(stringConsumer("foo")) + .thenCancel() + .verify(); - StepVerifier.create(publisher) - .consumeNextWith(stringConsumer("foobarbaz")) - .verifyComplete(); - } + latch.await(); + } - @ParameterizedDataBufferAllocatingTest - void outputStreamPublisherFlush(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); - byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); - byte[] baz = "baz".getBytes(StandardCharsets.UTF_8); - - Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { - try { - outputStream.write(foo); - outputStream.flush(); - outputStream.write(bar); - outputStream.flush(); - outputStream.write(baz); - outputStream.flush(); - } - catch (IOException ex) { - fail(ex.getMessage(), ex); - } - }, super.bufferFactory, Executors.newSingleThreadExecutor()); + @Test + void outputStreamPublisherClosed() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); - StepVerifier.create(publisher) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("baz")) - .verifyComplete(); - } + Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { + try { + OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8); + writer.write("foo"); + writer.close(); + assertThatIOException().isThrownBy(() -> writer.write("bar")) + .withMessage("Stream closed"); + } + catch (IOException ex) { + fail(ex.getMessage(), ex); + } + finally { + latch.countDown(); + } + }, bufferFactory, Executors.newSingleThreadExecutor()); + + StepVerifier.create(publisher) + .consumeNextWith(stringConsumer("foo")) + .verifyComplete(); - @ParameterizedDataBufferAllocatingTest - void outputStreamPublisherChunkSize(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + latch.await(); + } - byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); - byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); - byte[] baz = "baz".getBytes(StandardCharsets.UTF_8); + @Test + void inputStreamSubscriberChunkSize() { + genericInputStreamSubscriberTest( + bufferFactory, 3, 3, 64, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); + } - Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { - try { - outputStream.write(foo); - outputStream.write(bar); - outputStream.write(baz); - } - catch (IOException ex) { - fail(ex.getMessage(), ex); - } - }, super.bufferFactory, Executors.newSingleThreadExecutor(), 3); + @Test + void inputStreamSubscriberChunkSize2() { + genericInputStreamSubscriberTest( + bufferFactory, 3, 3, 1, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); + } - StepVerifier.create(publisher) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .consumeNextWith(stringConsumer("baz")) - .verifyComplete(); - } + @Test + void inputStreamSubscriberChunkSize3() { + genericInputStreamSubscriberTest(bufferFactory, 3, 12, 1, List.of("foo", "bar", "baz"), List.of("foobarbaz")); + } - @ParameterizedDataBufferAllocatingTest - void outputStreamPublisherCancel(DataBufferFactory bufferFactory) throws InterruptedException { - super.bufferFactory = bufferFactory; + @Test + void inputStreamSubscriberChunkSize4() { + genericInputStreamSubscriberTest( + bufferFactory, 3, 1, 1, List.of("foo", "bar", "baz"), List.of("f", "o", "o", "b", "a", "r", "b", "a", "z")); + } - byte[] foo = "foo".getBytes(StandardCharsets.UTF_8); - byte[] bar = "bar".getBytes(StandardCharsets.UTF_8); + @Test + void inputStreamSubscriberChunkSize5() { + genericInputStreamSubscriberTest( + bufferFactory, 3, 2, 1, List.of("foo", "bar", "baz"), List.of("fo", "ob", "ar", "ba", "z")); + } - CountDownLatch latch = new CountDownLatch(1); + @Test + void inputStreamSubscriberChunkSize6() { + genericInputStreamSubscriberTest( + bufferFactory, 1, 3, 1, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); + } - Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { - try { - assertThatIOException() - .isThrownBy(() -> { - outputStream.write(foo); - outputStream.flush(); - outputStream.write(bar); - outputStream.flush(); - }) - .withMessage("Subscription has been terminated"); - } - finally { - latch.countDown(); - } - }, super.bufferFactory, Executors.newSingleThreadExecutor()); + @Test + void inputStreamSubscriberChunkSize7() { + genericInputStreamSubscriberTest( + bufferFactory, 1, 3, 64, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); + } - StepVerifier.create(publisher, 1) - .consumeNextWith(stringConsumer("foo")) - .thenCancel() - .verify(); + void genericInputStreamSubscriberTest( + DataBufferFactory factory, int writeChunkSize, int readChunkSize, int bufferSize, + List input, List expectedOutput) { - latch.await(); - } + bufferFactory = factory; - @ParameterizedDataBufferAllocatingTest - void outputStreamPublisherClosed(DataBufferFactory bufferFactory) throws InterruptedException { - super.bufferFactory = bufferFactory; + Publisher publisher = DataBufferUtils.outputStreamPublisher( + out -> { + try { + for (String word : input) { + out.write(word.getBytes(StandardCharsets.UTF_8)); + } + } + catch (IOException ex) { + fail(ex.getMessage(), ex); + } + }, + bufferFactory, Executors.newSingleThreadExecutor(), writeChunkSize); - CountDownLatch latch = new CountDownLatch(1); + byte[] chunk = new byte[readChunkSize]; + List words = new ArrayList<>(); - Publisher publisher = DataBufferUtils.outputStreamPublisher(outputStream -> { - try { - OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8); - writer.write("foo"); - writer.close(); - assertThatIOException().isThrownBy(() -> writer.write("bar")) - .withMessage("Stream closed"); - } - catch (IOException ex) { - fail(ex.getMessage(), ex); + try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, bufferSize)) { + int read; + while ((read = in.read(chunk)) > -1) { + words.add(new String(chunk, 0, read, StandardCharsets.UTF_8)); + } } - finally { - latch.countDown(); - } - }, super.bufferFactory, Executors.newSingleThreadExecutor()); - - StepVerifier.create(publisher) - .consumeNextWith(stringConsumer("foo")) - .verifyComplete(); - - latch.await(); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberChunkSize(DataBufferFactory factory) { - genericInputStreamSubscriberTest( - factory, 3, 3, 64, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberChunkSize2(DataBufferFactory factory) { - genericInputStreamSubscriberTest( - factory, 3, 3, 1, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberChunkSize3(DataBufferFactory factory) { - genericInputStreamSubscriberTest(factory, 3, 12, 1, List.of("foo", "bar", "baz"), List.of("foobarbaz")); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberChunkSize4(DataBufferFactory factory) { - genericInputStreamSubscriberTest( - factory, 3, 1, 1, List.of("foo", "bar", "baz"), List.of("f", "o", "o", "b", "a", "r", "b", "a", "z")); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberChunkSize5(DataBufferFactory factory) { - genericInputStreamSubscriberTest( - factory, 3, 2, 1, List.of("foo", "bar", "baz"), List.of("fo", "ob", "ar", "ba", "z")); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberChunkSize6(DataBufferFactory factory) { - genericInputStreamSubscriberTest( - factory, 1, 3, 1, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberChunkSize7(DataBufferFactory factory) { - genericInputStreamSubscriberTest( - factory, 1, 3, 64, List.of("foo", "bar", "baz"), List.of("foo", "bar", "baz")); - } - - void genericInputStreamSubscriberTest( - DataBufferFactory factory, int writeChunkSize, int readChunkSize, int bufferSize, - List input, List expectedOutput) { - - super.bufferFactory = factory; - - Publisher publisher = DataBufferUtils.outputStreamPublisher( - out -> { - try { - for (String word : input) { - out.write(word.getBytes(StandardCharsets.UTF_8)); - } - } - catch (IOException ex) { - fail(ex.getMessage(), ex); - } - }, - super.bufferFactory, Executors.newSingleThreadExecutor(), writeChunkSize); - - byte[] chunk = new byte[readChunkSize]; - List words = new ArrayList<>(); - - try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, bufferSize)) { - int read; - while ((read = in.read(chunk)) > -1) { - words.add(new String(chunk, 0, read, StandardCharsets.UTF_8)); + catch (IOException e) { + throw new RuntimeException(e); } + assertThat(words).containsExactlyElementsOf(expectedOutput); } - catch (IOException e) { - throw new RuntimeException(e); - } - assertThat(words).containsExactlyElementsOf(expectedOutput); - } - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberError(DataBufferFactory factory) { - super.bufferFactory = factory; + @Test + void inputStreamSubscriberError() { + var input = List.of("foo ", "bar ", "baz"); - var input = List.of("foo ", "bar ", "baz"); - - Publisher publisher = DataBufferUtils.outputStreamPublisher( - out -> { - try { - for (String word : input) { - out.write(word.getBytes(StandardCharsets.UTF_8)); + Publisher publisher = DataBufferUtils.outputStreamPublisher( + out -> { + try { + for (String word : input) { + out.write(word.getBytes(StandardCharsets.UTF_8)); + } + throw new RuntimeException("boom"); } - throw new RuntimeException("boom"); - } - catch (IOException ex) { - fail(ex.getMessage(), ex); - } - }, - super.bufferFactory, Executors.newSingleThreadExecutor(), 1); - - - RuntimeException error = null; - byte[] chunk = new byte[4]; - List words = new ArrayList<>(); - - try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, 1)) { - int read; - while ((read = in.read(chunk)) > -1) { - words.add(new String(chunk, 0, read, StandardCharsets.UTF_8)); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - catch (RuntimeException e) { - error = e; - } - assertThat(words).containsExactlyElementsOf(List.of("foo ", "bar ", "baz")); - assertThat(error).hasMessage("boom"); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberMixedReadMode(DataBufferFactory factory) { - super.bufferFactory = factory; - - var input = List.of("foo ", "bar ", "baz"); - - Publisher publisher = DataBufferUtils.outputStreamPublisher( - out -> { - try { - for (String word : input) { - out.write(word.getBytes(StandardCharsets.UTF_8)); + catch (IOException ex) { + fail(ex.getMessage(), ex); } - } - catch (IOException ex) { - fail(ex.getMessage(), ex); - } - }, - super.bufferFactory, Executors.newSingleThreadExecutor(), 1); - + }, + bufferFactory, Executors.newSingleThreadExecutor(), 1); - byte[] chunk = new byte[3]; - ArrayList words = new ArrayList<>(); + RuntimeException error = null; + byte[] chunk = new byte[4]; + List words = new ArrayList<>(); - try (InputStream inputStream = DataBufferUtils.subscriberInputStream(publisher, 1)) { - words.add(new String(chunk,0, inputStream.read(chunk), StandardCharsets.UTF_8)); - assertThat(inputStream.read()).isEqualTo(' ' & 0xFF); - words.add(new String(chunk,0, inputStream.read(chunk), StandardCharsets.UTF_8)); - assertThat(inputStream.read()).isEqualTo(' ' & 0xFF); - words.add(new String(chunk,0, inputStream.read(chunk), StandardCharsets.UTF_8)); - assertThat(inputStream.read()).isEqualTo(-1); - } - catch (IOException e) { - throw new RuntimeException(e); + try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, 1)) { + int read; + while ((read = in.read(chunk)) > -1) { + words.add(new String(chunk, 0, read, StandardCharsets.UTF_8)); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + catch (RuntimeException e) { + error = e; + } + assertThat(words).containsExactlyElementsOf(List.of("foo ", "bar ", "baz")); + assertThat(error).hasMessage("boom"); } - assertThat(words).containsExactlyElementsOf(List.of("foo", "bar", "baz")); - } - - @ParameterizedDataBufferAllocatingTest - void inputStreamSubscriberClose(DataBufferFactory bufferFactory) throws InterruptedException { - for (int i = 1; i < 100; i++) { - CountDownLatch latch = new CountDownLatch(1); - super.bufferFactory = bufferFactory; - var input = List.of("foo", "bar", "baz"); + @Test + void inputStreamSubscriberMixedReadMode() { + var input = List.of("foo ", "bar ", "baz"); Publisher publisher = DataBufferUtils.outputStreamPublisher( out -> { try { - assertThatIOException() - .isThrownBy(() -> { - for (String word : input) { - out.write(word.getBytes(StandardCharsets.UTF_8)); - out.flush(); - } - }) - .withMessage("Subscription has been terminated"); + for (String word : input) { + out.write(word.getBytes(StandardCharsets.UTF_8)); + } } - finally { - latch.countDown(); + catch (IOException ex) { + fail(ex.getMessage(), ex); } }, - super.bufferFactory, Executors.newSingleThreadExecutor(), 1); - + bufferFactory, Executors.newSingleThreadExecutor(), 1); byte[] chunk = new byte[3]; ArrayList words = new ArrayList<>(); - try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, ThreadLocalRandom.current().nextInt(1, 4))) { - in.read(chunk); - String word = new String(chunk, StandardCharsets.UTF_8); - words.add(word); + try (InputStream inputStream = DataBufferUtils.subscriberInputStream(publisher, 1)) { + words.add(new String(chunk, 0, inputStream.read(chunk), StandardCharsets.UTF_8)); + assertThat(inputStream.read()).isEqualTo(' ' & 0xFF); + words.add(new String(chunk, 0, inputStream.read(chunk), StandardCharsets.UTF_8)); + assertThat(inputStream.read()).isEqualTo(' ' & 0xFF); + words.add(new String(chunk, 0, inputStream.read(chunk), StandardCharsets.UTF_8)); + assertThat(inputStream.read()).isEqualTo(-1); } catch (IOException e) { throw new RuntimeException(e); } - assertThat(words).containsExactlyElementsOf(List.of("foo")); - latch.await(); + assertThat(words).containsExactlyElementsOf(List.of("foo", "bar", "baz")); } - } - @ParameterizedDataBufferAllocatingTest - void readAndWriteByteChannel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; + @Test + void inputStreamSubscriberClose() throws InterruptedException { + for (int i = 1; i < 100; i++) { + CountDownLatch latch = new CountDownLatch(1); - Path source = Paths.get( - DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI()); - Flux sourceFlux = - DataBufferUtils - .readByteChannel(() -> FileChannel.open(source, StandardOpenOption.READ), - super.bufferFactory, 3); + var input = List.of("foo", "bar", "baz"); - Path destination = Files.createTempFile("DataBufferUtilsTests", null); - WritableByteChannel channel = Files.newByteChannel(destination, StandardOpenOption.WRITE); - - DataBufferUtils.write(sourceFlux, channel) - .subscribe(DataBufferUtils.releaseConsumer(), - throwable -> { - throw new AssertionError(throwable.getMessage(), throwable); - }, - () -> { + Publisher publisher = DataBufferUtils.outputStreamPublisher( + out -> { try { - String expected = String.join("", Files.readAllLines(source)); - String result = String.join("", Files.readAllLines(destination)); - assertThat(result).isEqualTo(expected); - } - catch (IOException e) { - throw new AssertionError(e.getMessage(), e); + assertThatIOException() + .isThrownBy(() -> { + for (String word : input) { + out.write(word.getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + }) + .withMessage("Subscription has been terminated"); } finally { - DataBufferUtils.closeChannel(channel); - } - }); - } - - @ParameterizedDataBufferAllocatingTest - void readAndWriteAsynchronousFileChannel(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - Path source = Paths.get( - DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI()); - Flux sourceFlux = DataBufferUtils.readAsynchronousFileChannel( - () -> AsynchronousFileChannel.open(source, StandardOpenOption.READ), - super.bufferFactory, 3); - - Path destination = Files.createTempFile("DataBufferUtilsTests", null); - AsynchronousFileChannel channel = - AsynchronousFileChannel.open(destination, StandardOpenOption.WRITE); - - CountDownLatch latch = new CountDownLatch(1); - - DataBufferUtils.write(sourceFlux, channel) - .subscribe(DataBufferUtils::release, - throwable -> { - throw new AssertionError(throwable.getMessage(), throwable); - }, - () -> { - try { - String expected = String.join("", Files.readAllLines(source)); - String result = String.join("", Files.readAllLines(destination)); - - assertThat(result).isEqualTo(expected); latch.countDown(); - - } - catch (IOException e) { - throw new AssertionError(e.getMessage(), e); } - finally { - DataBufferUtils.closeChannel(channel); - } - }); - - latch.await(); - } - - @ParameterizedDataBufferAllocatingTest - void takeUntilByteCount(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - Flux result = DataBufferUtils.takeUntilByteCount( - Flux.just(stringBuffer("foo"), stringBuffer("bar")), 5L); + }, + bufferFactory, Executors.newSingleThreadExecutor(), 1); + + byte[] chunk = new byte[3]; + ArrayList words = new ArrayList<>(); + + try (InputStream in = DataBufferUtils.subscriberInputStream(publisher, ThreadLocalRandom.current().nextInt(1, 4))) { + in.read(chunk); + String word = new String(chunk, StandardCharsets.UTF_8); + words.add(word); + } + catch (IOException e) { + throw new RuntimeException(e); + } + assertThat(words).containsExactlyElementsOf(List.of("foo")); + latch.await(); + } + } - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("ba")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + @Test + void readAndWriteByteChannel() throws Exception { + Path source = Paths.get( + DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI()); + Flux sourceFlux = + DataBufferUtils + .readByteChannel(() -> FileChannel.open(source, StandardOpenOption.READ), + bufferFactory, 3); + + Path destination = Files.createTempFile("DataBufferUtilsTests", null); + WritableByteChannel channel = Files.newByteChannel(destination, StandardOpenOption.WRITE); + + DataBufferUtils.write(sourceFlux, channel) + .subscribe(DataBufferUtils.releaseConsumer(), + throwable -> { + throw new AssertionError(throwable.getMessage(), throwable); + }, + () -> { + try { + String expected = String.join("", Files.readAllLines(source)); + String result = String.join("", Files.readAllLines(destination)); + assertThat(result).isEqualTo(expected); + } + catch (IOException e) { + throw new AssertionError(e.getMessage(), e); + } + finally { + DataBufferUtils.closeChannel(channel); + } + }); + } - @ParameterizedDataBufferAllocatingTest - void takeUntilByteCountCanceled(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - Flux source = Flux.concat( - deferStringBuffer("foo"), - deferStringBuffer("bar") - ); - Flux result = DataBufferUtils.takeUntilByteCount( - source, 5L); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .thenCancel() - .verify(Duration.ofSeconds(5)); - } + @Test + void readAndWriteAsynchronousFileChannel() throws Exception { + Path source = Paths.get( + DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI()); + Flux sourceFlux = DataBufferUtils.readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(source, StandardOpenOption.READ), + bufferFactory, 3); - @ParameterizedDataBufferAllocatingTest - void takeUntilByteCountError(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + Path destination = Files.createTempFile("DataBufferUtilsTests", null); + AsynchronousFileChannel channel = + AsynchronousFileChannel.open(destination, StandardOpenOption.WRITE); - Flux source = Flux.concat( - Mono.defer(() -> Mono.just(stringBuffer("foo"))), - Mono.error(new RuntimeException()) - ); + CountDownLatch latch = new CountDownLatch(1); - Flux result = DataBufferUtils.takeUntilByteCount(source, 5L); + DataBufferUtils.write(sourceFlux, channel) + .subscribe(DataBufferUtils::release, + throwable -> { + throw new AssertionError(throwable.getMessage(), throwable); + }, + () -> { + try { + String expected = String.join("", Files.readAllLines(source)); + String result = String.join("", Files.readAllLines(destination)); + + assertThat(result).isEqualTo(expected); + latch.countDown(); + + } + catch (IOException e) { + throw new AssertionError(e.getMessage(), e); + } + finally { + DataBufferUtils.closeChannel(channel); + } + }); - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .expectError(RuntimeException.class) - .verify(Duration.ofSeconds(5)); - } + latch.await(); + } - @ParameterizedDataBufferAllocatingTest - void takeUntilByteCountExact(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void takeUntilByteCount() { + Flux result = DataBufferUtils.takeUntilByteCount( + Flux.just(stringBuffer("foo"), stringBuffer("bar")), 5L); - Flux source = Flux.concat( - deferStringBuffer("foo"), - deferStringBuffer("bar"), - deferStringBuffer("baz") - ); + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("ba")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - Flux result = DataBufferUtils.takeUntilByteCount(source, 6L); + @Test + void takeUntilByteCountCanceled() { + Flux source = Flux.concat( + deferStringBuffer("foo"), + deferStringBuffer("bar") + ); + Flux result = DataBufferUtils.takeUntilByteCount( + source, 5L); - StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) - .consumeNextWith(stringConsumer("bar")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } - - @ParameterizedDataBufferAllocatingTest - void skipUntilByteCount(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - Flux source = Flux.concat( - deferStringBuffer("foo"), - deferStringBuffer("bar"), - deferStringBuffer("baz") - ); - Flux result = DataBufferUtils.skipUntilByteCount(source, 5L); - - StepVerifier.create(result) - .consumeNextWith(stringConsumer("r")) - .consumeNextWith(stringConsumer("baz")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .thenCancel() + .verify(Duration.ofSeconds(5)); + } - @ParameterizedDataBufferAllocatingTest - void skipUntilByteCountCancelled(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void takeUntilByteCountError() { + Flux source = Flux.concat( + Mono.defer(() -> Mono.just(stringBuffer("foo"))), + Mono.error(new RuntimeException()) + ); - Flux source = Flux.concat( - deferStringBuffer("foo"), - deferStringBuffer("bar") - ); - Flux result = DataBufferUtils.skipUntilByteCount(source, 5L); + Flux result = DataBufferUtils.takeUntilByteCount(source, 5L); - StepVerifier.create(result) - .consumeNextWith(stringConsumer("r")) - .thenCancel() - .verify(Duration.ofSeconds(5)); - } + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .expectError(RuntimeException.class) + .verify(Duration.ofSeconds(5)); + } - @ParameterizedDataBufferAllocatingTest - void skipUntilByteCountErrorInFlux(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void takeUntilByteCountExact() { + Flux source = Flux.concat( + deferStringBuffer("foo"), + deferStringBuffer("bar"), + deferStringBuffer("baz") + ); - DataBuffer foo = stringBuffer("foo"); - Flux flux = - Flux.just(foo).concatWith(Mono.error(new RuntimeException())); - Flux result = DataBufferUtils.skipUntilByteCount(flux, 3L); + Flux result = DataBufferUtils.takeUntilByteCount(source, 6L); - StepVerifier.create(result) - .expectError(RuntimeException.class) - .verify(Duration.ofSeconds(5)); - } + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - @ParameterizedDataBufferAllocatingTest - void skipUntilByteCountShouldSkipAll(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void skipUntilByteCount() { + Flux source = Flux.concat( + deferStringBuffer("foo"), + deferStringBuffer("bar"), + deferStringBuffer("baz") + ); + Flux result = DataBufferUtils.skipUntilByteCount(source, 5L); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - Flux flux = Flux.just(foo, bar, baz); - Flux result = DataBufferUtils.skipUntilByteCount(flux, 9L); + StepVerifier.create(result) + .consumeNextWith(stringConsumer("r")) + .consumeNextWith(stringConsumer("baz")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - StepVerifier.create(result) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + @Test + void skipUntilByteCountCancelled() { + Flux source = Flux.concat( + deferStringBuffer("foo"), + deferStringBuffer("bar") + ); + Flux result = DataBufferUtils.skipUntilByteCount(source, 5L); - @ParameterizedDataBufferAllocatingTest - void releaseConsumer(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + StepVerifier.create(result) + .consumeNextWith(stringConsumer("r")) + .thenCancel() + .verify(Duration.ofSeconds(5)); + } - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - Flux flux = Flux.just(foo, bar, baz); + @Test + void skipUntilByteCountErrorInFlux() { + DataBuffer foo = stringBuffer("foo"); + Flux flux = + Flux.just(foo).concatWith(Mono.error(new RuntimeException())); + Flux result = DataBufferUtils.skipUntilByteCount(flux, 3L); - flux.subscribe(DataBufferUtils.releaseConsumer()); + StepVerifier.create(result) + .expectError(RuntimeException.class) + .verify(Duration.ofSeconds(5)); + } - assertReleased(foo); - assertReleased(bar); - assertReleased(baz); - } + @Test + void skipUntilByteCountShouldSkipAll() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + Flux result = DataBufferUtils.skipUntilByteCount(flux, 9L); - private static void assertReleased(DataBuffer dataBuffer) { - if (dataBuffer instanceof NettyDataBuffer nettyDataBuffer) { - ByteBuf byteBuf = nettyDataBuffer.getNativeBuffer(); - assertThat(byteBuf.refCnt()).isEqualTo(0); + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); } - } - @ParameterizedDataBufferAllocatingTest - void SPR16070(DataBufferFactory bufferFactory) throws Exception { - super.bufferFactory = bufferFactory; - - ReadableByteChannel channel = mock(); - given(channel.read(any())) - .willAnswer(putByte('a')) - .willAnswer(putByte('b')) - .willAnswer(putByte('c')) - .willReturn(-1); - - Flux read = - DataBufferUtils.readByteChannel(() -> channel, super.bufferFactory, 1); - - StepVerifier.create(read) - .consumeNextWith(stringConsumer("a")) - .consumeNextWith(stringConsumer("b")) - .consumeNextWith(stringConsumer("c")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - } + @Test + void releaseConsumer() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); - private Answer putByte(int b) { - return invocation -> { - ByteBuffer buffer = invocation.getArgument(0); - buffer.put((byte) b); - return 1; - }; - } + flux.subscribe(DataBufferUtils.releaseConsumer()); - @ParameterizedDataBufferAllocatingTest - void join(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - Flux flux = Flux.just(foo, bar, baz); - Mono result = DataBufferUtils.join(flux); - - StepVerifier.create(result) - .consumeNextWith(buf -> { - assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo("foobarbaz"); - release(buf); - }) - .verifyComplete(); - } - - @ParameterizedDataBufferAllocatingTest - void joinWithLimit(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + assertReleased(foo); + assertReleased(bar); + assertReleased(baz); + } - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - Flux flux = Flux.just(foo, bar, baz); - Mono result = DataBufferUtils.join(flux, 8); + private static void assertReleased(DataBuffer dataBuffer) { + if (dataBuffer instanceof NettyDataBuffer nettyDataBuffer) { + ByteBuf byteBuf = nettyDataBuffer.getNativeBuffer(); + assertThat(byteBuf.refCnt()).isEqualTo(0); + } + } - StepVerifier.create(result) - .verifyError(DataBufferLimitException.class); - } + @Test + void SPR16070() throws Exception { + ReadableByteChannel channel = mock(); + given(channel.read(any())) + .willAnswer(putByte('a')) + .willAnswer(putByte('b')) + .willAnswer(putByte('c')) + .willReturn(-1); + + Flux read = + DataBufferUtils.readByteChannel(() -> channel, bufferFactory, 1); + + StepVerifier.create(read) + .consumeNextWith(stringConsumer("a")) + .consumeNextWith(stringConsumer("b")) + .consumeNextWith(stringConsumer("c")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } - @Test // gh-26060 - void joinWithLimitDoesNotOverRelease() { - NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); - byte[] bytes = "foo-bar-baz".getBytes(StandardCharsets.UTF_8); + private Answer putByte(int b) { + return invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + buffer.put((byte) b); + return 1; + }; + } - NettyDataBuffer buffer = bufferFactory.allocateBuffer(bytes.length); - buffer.getNativeBuffer().retain(); // should be at 2 now - buffer.write(bytes); + @Test + void join() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + Mono result = DataBufferUtils.join(flux); - Mono result = DataBufferUtils.join(Flux.just(buffer), 8); + StepVerifier.create(result) + .consumeNextWith(buf -> { + assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo("foobarbaz"); + release(buf); + }) + .verifyComplete(); + } - StepVerifier.create(result).verifyError(DataBufferLimitException.class); - assertThat(buffer.getNativeBuffer().refCnt()).isEqualTo(1); - buffer.release(); - } + @Test + void joinWithLimit() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + Mono result = DataBufferUtils.join(flux, 8); - @ParameterizedDataBufferAllocatingTest - void joinErrors(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + StepVerifier.create(result) + .verifyError(DataBufferLimitException.class); + } - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - Flux flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException())); - Mono result = DataBufferUtils.join(flux); + @Test + void joinErrors() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException())); + Mono result = DataBufferUtils.join(flux); - StepVerifier.create(result) - .expectError(RuntimeException.class) - .verify(); - } + StepVerifier.create(result) + .expectError(RuntimeException.class) + .verify(); + } - @ParameterizedDataBufferAllocatingTest - void joinCanceled(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void joinCanceled() { + Flux source = Flux.concat( + deferStringBuffer("foo"), + deferStringBuffer("bar"), + deferStringBuffer("baz") + ); + Mono result = DataBufferUtils.join(source); - Flux source = Flux.concat( - deferStringBuffer("foo"), - deferStringBuffer("bar"), - deferStringBuffer("baz") - ); - Mono result = DataBufferUtils.join(source); + StepVerifier.create(result) + .thenCancel() + .verify(); + } - StepVerifier.create(result) - .thenCancel() - .verify(); - } + @Test + void matcher() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); - @ParameterizedDataBufferAllocatingTest - void matcher(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + byte[] delims = "ooba".getBytes(StandardCharsets.UTF_8); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); + int result = matcher.match(foo); + assertThat(result).isEqualTo(-1); + result = matcher.match(bar); + assertThat(result).isEqualTo(1); - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); + release(foo, bar); + } - byte[] delims = "ooba".getBytes(StandardCharsets.UTF_8); - DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); - int result = matcher.match(foo); - assertThat(result).isEqualTo(-1); - result = matcher.match(bar); - assertThat(result).isEqualTo(1); + @Test + void matcher2() { + DataBuffer foo = stringBuffer("foooobar"); + + byte[] delims = "oo".getBytes(StandardCharsets.UTF_8); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); + int endIndex = matcher.match(foo); + assertThat(endIndex).isEqualTo(2); + foo.readPosition(endIndex + 1); + endIndex = matcher.match(foo); + assertThat(endIndex).isEqualTo(4); + foo.readPosition(endIndex + 1); + endIndex = matcher.match(foo); + assertThat(endIndex).isEqualTo(-1); + + release(foo); + } + @Test + void matcher3() { + DataBuffer foo = stringBuffer("foooobar"); + + byte[] delims = "oo".getBytes(StandardCharsets.UTF_8); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); + int endIndex = matcher.match(foo); + assertThat(endIndex).isEqualTo(2); + foo.readPosition(endIndex + 1); + endIndex = matcher.match(foo); + assertThat(endIndex).isEqualTo(4); + foo.readPosition(endIndex + 1); + endIndex = matcher.match(foo); + assertThat(endIndex).isEqualTo(-1); + + release(foo); + } - release(foo, bar); - } + @Test + void propagateContextByteChannel() throws IOException { + Path path = Paths.get(resource.getURI()); + try (SeekableByteChannel out = Files.newByteChannel(tempFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) { + Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .transform(f -> DataBufferUtils.write(f, out)) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .contextWrite(Context.of("key", "TEST")); - @ParameterizedDataBufferAllocatingTest - void matcher2(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + StepVerifier.create(result) + .consumeNextWith(DataBufferUtils::release) + .verifyComplete(); - DataBuffer foo = stringBuffer("foooobar"); - byte[] delims = "oo".getBytes(StandardCharsets.UTF_8); - DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); - int endIndex = matcher.match(foo); - assertThat(endIndex).isEqualTo(2); - foo.readPosition(endIndex + 1); - endIndex = matcher.match(foo); - assertThat(endIndex).isEqualTo(4); - foo.readPosition(endIndex + 1); - endIndex = matcher.match(foo); - assertThat(endIndex).isEqualTo(-1); + } + } - release(foo); - } + @Test + void propagateContextAsynchronousFileChannel() throws IOException { + Path path = Paths.get(resource.getURI()); + try (AsynchronousFileChannel out = AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) { + Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .transform(f -> DataBufferUtils.write(f, out)) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .contextWrite(Context.of("key", "TEST")); - @ParameterizedDataBufferAllocatingTest - void matcher3(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + StepVerifier.create(result) + .consumeNextWith(DataBufferUtils::release) + .verifyComplete(); - DataBuffer foo = stringBuffer("foooobar"); - byte[] delims = "oo".getBytes(StandardCharsets.UTF_8); - DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delims); - int endIndex = matcher.match(foo); - assertThat(endIndex).isEqualTo(2); - foo.readPosition(endIndex + 1); - endIndex = matcher.match(foo); - assertThat(endIndex).isEqualTo(4); - foo.readPosition(endIndex + 1); - endIndex = matcher.match(foo); - assertThat(endIndex).isEqualTo(-1); + } + } - release(foo); - } + @Test + void propagateContextPath() throws IOException { + Path path = Paths.get(resource.getURI()); + Path out = Files.createTempFile("data-buffer-utils-tests", ".tmp"); - @ParameterizedDataBufferAllocatingTest - void propagateContextByteChannel(DataBufferFactory bufferFactory) throws IOException { - Path path = Paths.get(this.resource.getURI()); - try (SeekableByteChannel out = Files.newByteChannel(this.tempFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) { - Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) + Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) .transformDeferredContextual((f, ctx) -> { assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); return f; @@ -1298,65 +1222,33 @@ void propagateContextByteChannel(DataBufferFactory bufferFactory) throws IOExcep .contextWrite(Context.of("key", "TEST")); StepVerifier.create(result) - .consumeNextWith(DataBufferUtils::release) .verifyComplete(); - - } - } - @ParameterizedDataBufferAllocatingTest - void propagateContextAsynchronousFileChannel(DataBufferFactory bufferFactory) throws IOException { - Path path = Paths.get(this.resource.getURI()); - try (AsynchronousFileChannel out = AsynchronousFileChannel.open(this.tempFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) { - Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) - .transformDeferredContextual((f, ctx) -> { - assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); - return f; - }) - .transform(f -> DataBufferUtils.write(f, out)) - .transformDeferredContextual((f, ctx) -> { - assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); - return f; - }) - .contextWrite(Context.of("key", "TEST")); - - StepVerifier.create(result) - .consumeNextWith(DataBufferUtils::release) - .verifyComplete(); + private static class ZeroDemandSubscriber extends BaseSubscriber { + @Override + protected void hookOnSubscribe(Subscription subscription) { + // Just subscribe without requesting + } } } - @ParameterizedDataBufferAllocatingTest - void propagateContextPath(DataBufferFactory bufferFactory) throws IOException { - Path path = Paths.get(this.resource.getURI()); - Path out = Files.createTempFile("data-buffer-utils-tests", ".tmp"); - - Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) - .transformDeferredContextual((f, ctx) -> { - assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); - return f; - }) - .transform(f -> DataBufferUtils.write(f, out)) - .transformDeferredContextual((f, ctx) -> { - assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); - return f; - }) - .contextWrite(Context.of("key", "TEST")); - - StepVerifier.create(result) - .verifyComplete(); - } + @Test // gh-26060 + void joinWithLimitDoesNotOverRelease() { + NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); + byte[] bytes = "foo-bar-baz".getBytes(StandardCharsets.UTF_8); + NettyDataBuffer buffer = bufferFactory.allocateBuffer(bytes.length); + buffer.getNativeBuffer().retain(); // should be at 2 now + buffer.write(bytes); - private static class ZeroDemandSubscriber extends BaseSubscriber { + Mono result = DataBufferUtils.join(Flux.just(buffer), 8); - @Override - protected void hookOnSubscribe(Subscription subscription) { - // Just subscribe without requesting - } + StepVerifier.create(result).verifyError(DataBufferLimitException.class); + assertThat(buffer.getNativeBuffer().refCnt()).isEqualTo(1); + buffer.release(); } } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/support/DataBufferTestUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/support/DataBufferTestUtilsTests.java index d12f3ea36de5..1d0d3c7a1612 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/support/DataBufferTestUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/support/DataBufferTestUtilsTests.java @@ -18,8 +18,9 @@ import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; + import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.testfixture.io.buffer.AbstractDataBufferAllocatingTests; import org.springframework.core.testfixture.io.buffer.DataBufferTestUtils; @@ -31,10 +32,8 @@ */ class DataBufferTestUtilsTests extends AbstractDataBufferAllocatingTests { - @ParameterizedDataBufferAllocatingTest - void dumpBytes(DataBufferFactory bufferFactory) { - this.bufferFactory = bufferFactory; - + @Test + void dumpBytes() { DataBuffer buffer = this.bufferFactory.allocateBuffer(4); byte[] source = {'a', 'b', 'c', 'd'}; buffer.write(source); @@ -46,10 +45,8 @@ void dumpBytes(DataBufferFactory bufferFactory) { release(buffer); } - @ParameterizedDataBufferAllocatingTest - void dumpString(DataBufferFactory bufferFactory) { - this.bufferFactory = bufferFactory; - + @Test + void dumpString() { DataBuffer buffer = this.bufferFactory.allocateBuffer(4); String source = "abcd"; buffer.write(source.getBytes(StandardCharsets.UTF_8)); diff --git a/spring-core/src/test/java/org/springframework/util/MultiValueMapTests.java b/spring-core/src/test/java/org/springframework/util/MultiValueMapTests.java index 62895671174b..8eda2c77feb2 100644 --- a/spring-core/src/test/java/org/springframework/util/MultiValueMapTests.java +++ b/spring-core/src/test/java/org/springframework/util/MultiValueMapTests.java @@ -16,8 +16,6 @@ package org.springframework.util; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; @@ -25,8 +23,9 @@ import java.util.Map; import java.util.stream.Stream; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -45,98 +44,120 @@ */ class MultiValueMapTests { - @ParameterizedMultiValueMapTest - void add(MultiValueMap map) { - int initialSize = map.size(); - map.add("key", "value1"); - map.add("key", "value2"); - assertThat(map).hasSize(initialSize + 1); - assertThat(map.get("key")).containsExactly("value1", "value2"); - } - - @ParameterizedMultiValueMapTest - void addIfAbsentWhenAbsent(MultiValueMap map) { - map.addIfAbsent("key", "value1"); - assertThat(map.get("key")).containsExactly("value1"); - } - - @ParameterizedMultiValueMapTest - void addIfAbsentWhenPresent(MultiValueMap map) { - map.add("key", "value1"); - map.addIfAbsent("key", "value2"); - assertThat(map.get("key")).containsExactly("value1"); - } - - @ParameterizedMultiValueMapTest - void set(MultiValueMap map) { - map.set("key", "value1"); - map.set("key", "value2"); - assertThat(map.get("key")).containsExactly("value2"); - } - - @ParameterizedMultiValueMapTest - void addAll(MultiValueMap map) { - int initialSize = map.size(); - map.add("key", "value1"); - map.addAll("key", Arrays.asList("value2", "value3")); - assertThat(map).hasSize(initialSize + 1); - assertThat(map.get("key")).containsExactly("value1", "value2", "value3"); - } - - @ParameterizedMultiValueMapTest - void addAllWithEmptyList(MultiValueMap map) { - int initialSize = map.size(); - map.addAll("key", List.of()); - assertThat(map).hasSize(initialSize + 1); - assertThat(map.get("key")).isEmpty(); - assertThat(map.getFirst("key")).isNull(); - } - - @ParameterizedMultiValueMapTest - void getFirst(MultiValueMap map) { - List values = List.of("value1", "value2"); - map.put("key", values); - assertThat(map.getFirst("key")).isEqualTo("value1"); - assertThat(map.getFirst("other")).isNull(); - } - - @ParameterizedMultiValueMapTest - void toSingleValueMap(MultiValueMap map) { - int initialSize = map.size(); - List values = List.of("value1", "value2"); - map.put("key", values); - Map singleValueMap = map.toSingleValueMap(); - assertThat(singleValueMap).hasSize(initialSize + 1); - assertThat(singleValueMap.get("key")).isEqualTo("value1"); - } - - @ParameterizedMultiValueMapTest - void toSingleValueMapWithEmptyList(MultiValueMap map) { - int initialSize = map.size(); - map.put("key", List.of()); - Map singleValueMap = map.toSingleValueMap(); - assertThat(singleValueMap).hasSize(initialSize); - assertThat(singleValueMap.get("key")).isNull(); - } - - @ParameterizedMultiValueMapTest - void equalsOnExistingValues(MultiValueMap map) { - map.clear(); - map.set("key1", "value1"); - assertThat(map).isEqualTo(map); - } - - @ParameterizedMultiValueMapTest - void equalsOnEmpty(MultiValueMap map) { - map.clear(); - map.set("key1", "value1"); - MultiValueMap map1 = new LinkedMultiValueMap<>(); - map1.set("key1", "value1"); - assertThat(map1).isEqualTo(map); - assertThat(map).isEqualTo(map1); - Map> map2 = Map.of("key1", List.of("value1")); - assertThat(map2).isEqualTo(map); - assertThat(map).isEqualTo(map2); + @Nested + @ParameterizedClass + @MethodSource("mapsUnderTest") + class ParameterizedMultiValueMapTests { + + private final MultiValueMap map; + + public ParameterizedMultiValueMapTests(MultiValueMap map) { + this.map = new LinkedMultiValueMap<>(map); + } + + @Test + void add() { + int initialSize = map.size(); + map.add("key", "value1"); + map.add("key", "value2"); + assertThat(map).hasSize(initialSize + 1); + assertThat(map.get("key")).containsExactly("value1", "value2"); + } + + @Test + void addIfAbsentWhenAbsent() { + map.addIfAbsent("key", "value1"); + assertThat(map.get("key")).containsExactly("value1"); + } + + @Test + void addIfAbsentWhenPresent() { + map.add("key", "value1"); + map.addIfAbsent("key", "value2"); + assertThat(map.get("key")).containsExactly("value1"); + } + + @Test + void set() { + map.set("key", "value1"); + map.set("key", "value2"); + assertThat(map.get("key")).containsExactly("value2"); + } + + @Test + void addAll() { + int initialSize = map.size(); + map.add("key", "value1"); + map.addAll("key", Arrays.asList("value2", "value3")); + assertThat(map).hasSize(initialSize + 1); + assertThat(map.get("key")).containsExactly("value1", "value2", "value3"); + } + + @Test + void addAllWithEmptyList() { + int initialSize = map.size(); + map.addAll("key", List.of()); + assertThat(map).hasSize(initialSize + 1); + assertThat(map.get("key")).isEmpty(); + assertThat(map.getFirst("key")).isNull(); + } + + @Test + void getFirst() { + List values = List.of("value1", "value2"); + map.put("key", values); + assertThat(map.getFirst("key")).isEqualTo("value1"); + assertThat(map.getFirst("other")).isNull(); + } + + @Test + void toSingleValueMap() { + int initialSize = map.size(); + List values = List.of("value1", "value2"); + map.put("key", values); + Map singleValueMap = map.toSingleValueMap(); + assertThat(singleValueMap).hasSize(initialSize + 1); + assertThat(singleValueMap.get("key")).isEqualTo("value1"); + } + + @Test + void toSingleValueMapWithEmptyList() { + int initialSize = map.size(); + map.put("key", List.of()); + Map singleValueMap = map.toSingleValueMap(); + assertThat(singleValueMap).hasSize(initialSize); + assertThat(singleValueMap.get("key")).isNull(); + } + + @Test + void equalsOnExistingValues() { + map.clear(); + map.set("key1", "value1"); + assertThat(map).isEqualTo(map); + } + + @Test + void equalsOnEmpty() { + map.clear(); + map.set("key1", "value1"); + MultiValueMap map1 = new LinkedMultiValueMap<>(); + map1.set("key1", "value1"); + assertThat(map1).isEqualTo(map); + assertThat(map).isEqualTo(map1); + Map> map2 = Map.of("key1", List.of("value1")); + assertThat(map2).isEqualTo(map); + assertThat(map).isEqualTo(map2); + } + + private static Stream mapsUnderTest() { + return Stream.of( + argumentSet("new LinkedMultiValueMap<>()", new LinkedMultiValueMap<>()), + argumentSet("new LinkedMultiValueMap<>(new HashMap<>())", new LinkedMultiValueMap<>(new HashMap<>())), + argumentSet("new LinkedMultiValueMap<>(new LinkedHashMap<>())", new LinkedMultiValueMap<>(new LinkedHashMap<>())), + argumentSet("new LinkedMultiValueMap<>(Map.of(...))", new LinkedMultiValueMap<>(Map.of("existingkey", List.of("existingvalue1", "existingvalue2")))), + argumentSet("CollectionUtils.toMultiValueMap", CollectionUtils.toMultiValueMap(new HashMap<>())) + ); + } } @Test @@ -181,21 +202,4 @@ private static MultiValueMap exampleMultiValueMap() { return map; } - - @Retention(RetentionPolicy.RUNTIME) - @ParameterizedTest - @MethodSource("mapsUnderTest") - @interface ParameterizedMultiValueMapTest { - } - - static Stream mapsUnderTest() { - return Stream.of( - argumentSet("new LinkedMultiValueMap<>()", new LinkedMultiValueMap<>()), - argumentSet("new LinkedMultiValueMap<>(new HashMap<>())", new LinkedMultiValueMap<>(new HashMap<>())), - argumentSet("new LinkedMultiValueMap<>(new LinkedHashMap<>())", new LinkedMultiValueMap<>(new LinkedHashMap<>())), - argumentSet("new LinkedMultiValueMap<>(Map.of(...))", new LinkedMultiValueMap<>(Map.of("existingkey", List.of("existingvalue1", "existingvalue2")))), - argumentSet("CollectionUtils.toMultiValueMap", CollectionUtils.toMultiValueMap(new HashMap<>())) - ); - } - } diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java index 742ada538d8e..fc5b36e0cfe8 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java @@ -16,10 +16,6 @@ package org.springframework.core.testfixture.io.buffer; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -37,7 +33,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.Parameter; +import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.publisher.Mono; @@ -60,6 +57,8 @@ * @author Rossen Stoyanchev * @author Sam Brannen */ +@ParameterizedClass +@MethodSource("dataBufferFactories") public abstract class AbstractDataBufferAllocatingTests { private static UnpooledByteBufAllocator netty4OffHeapUnpooled; @@ -74,6 +73,7 @@ public abstract class AbstractDataBufferAllocatingTests { @RegisterExtension AfterEachCallback leakDetector = context -> waitForDataBufferRelease(Duration.ofSeconds(2)); + @Parameter protected DataBufferFactory bufferFactory; @@ -169,14 +169,6 @@ public static void createAllocators() { netty4OffHeapPooled = new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true); } - - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - @ParameterizedTest - @MethodSource("org.springframework.core.testfixture.io.buffer.AbstractDataBufferAllocatingTests#dataBufferFactories()") - public @interface ParameterizedDataBufferAllocatingTest { - } - public static Stream dataBufferFactories() { return Stream.of( // Netty 4 diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java index 59aa10ce09b3..0b0defe06272 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Map; +import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,7 +31,6 @@ import tools.jackson.databind.json.JsonMapper; import org.springframework.core.ResolvableType; -import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.testfixture.io.buffer.AbstractDataBufferAllocatingTests; import org.springframework.http.MediaType; @@ -57,9 +57,8 @@ class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAllocating new ServerSentEventHttpMessageWriter(new JacksonJsonEncoder()); - @ParameterizedDataBufferAllocatingTest - void canWrite(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; + @Test + void canWrite() { assertThat(this.messageWriter.canWrite(forClass(Object.class), null)).isTrue(); assertThat(this.messageWriter.canWrite(forClass(Object.class), new MediaType("foo", "bar"))).isFalse(); @@ -72,14 +71,12 @@ void canWrite(DataBufferFactory bufferFactory) { assertThat(this.messageWriter.canWrite(ResolvableType.NONE, new MediaType("foo", "bar"))).isFalse(); } - @ParameterizedDataBufferAllocatingTest - void writeServerSentEvent(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test + void writeServerSentEvent() { ServerSentEvent event = ServerSentEvent.builder().data("bar").id("c42").event("foo") .comment("bla\nbla bla\nbla bla bla").retry(Duration.ofMillis(123L)).build(); - MockServerHttpResponse outputMessage = new MockServerHttpResponse(super.bufferFactory); + MockServerHttpResponse outputMessage = new MockServerHttpResponse(bufferFactory); Mono source = Mono.just(event); testWrite(source, outputMessage, ServerSentEvent.class); @@ -90,11 +87,9 @@ void writeServerSentEvent(DataBufferFactory bufferFactory) { .verify(); } - @ParameterizedDataBufferAllocatingTest - void writeString(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - MockServerHttpResponse outputMessage = new MockServerHttpResponse(super.bufferFactory); + @Test + void writeString() { + MockServerHttpResponse outputMessage = new MockServerHttpResponse(bufferFactory); Flux source = Flux.just("foo", "bar"); testWrite(source, outputMessage, String.class); @@ -105,11 +100,9 @@ void writeString(DataBufferFactory bufferFactory) { .verify(); } - @ParameterizedDataBufferAllocatingTest - void writeMultiLineString(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - MockServerHttpResponse outputMessage = new MockServerHttpResponse(super.bufferFactory); + @Test + void writeMultiLineString() { + MockServerHttpResponse outputMessage = new MockServerHttpResponse(); Flux source = Flux.just("foo\nbar", "foo\nbaz"); testWrite(source, outputMessage, String.class); @@ -120,11 +113,9 @@ void writeMultiLineString(DataBufferFactory bufferFactory) { .verify(); } - @ParameterizedDataBufferAllocatingTest // SPR-16516 - void writeStringWithCustomCharset(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - MockServerHttpResponse outputMessage = new MockServerHttpResponse(super.bufferFactory); + @Test // SPR-16516 + void writeStringWithCustomCharset() { + MockServerHttpResponse outputMessage = new MockServerHttpResponse(bufferFactory); Flux source = Flux.just("\u00A3"); Charset charset = StandardCharsets.ISO_8859_1; MediaType mediaType = new MediaType("text", "event-stream", charset); @@ -141,11 +132,9 @@ void writeStringWithCustomCharset(DataBufferFactory bufferFactory) { .verify(); } - @ParameterizedDataBufferAllocatingTest - void writePojo(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - MockServerHttpResponse outputMessage = new MockServerHttpResponse(super.bufferFactory); + @Test + void writePojo() { + MockServerHttpResponse outputMessage = new MockServerHttpResponse(bufferFactory); Flux source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); testWrite(source, outputMessage, Pojo.class); @@ -160,14 +149,12 @@ void writePojo(DataBufferFactory bufferFactory) { .verify(); } - @ParameterizedDataBufferAllocatingTest // SPR-14899 - void writePojoWithPrettyPrint(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - + @Test // SPR-14899 + void writePojoWithPrettyPrint() { JsonMapper mapper = JsonMapper.builder().enable(SerializationFeature.INDENT_OUTPUT).build(); this.messageWriter = new ServerSentEventHttpMessageWriter(new JacksonJsonEncoder(mapper)); - MockServerHttpResponse outputMessage = new MockServerHttpResponse(super.bufferFactory); + MockServerHttpResponse outputMessage = new MockServerHttpResponse(bufferFactory); Flux source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); testWrite(source, outputMessage, Pojo.class); @@ -190,11 +177,9 @@ void writePojoWithPrettyPrint(DataBufferFactory bufferFactory) { .verify(); } - @ParameterizedDataBufferAllocatingTest // SPR-16516, SPR-16539 - void writePojoWithCustomEncoding(DataBufferFactory bufferFactory) { - super.bufferFactory = bufferFactory; - - MockServerHttpResponse outputMessage = new MockServerHttpResponse(super.bufferFactory); + @Test // SPR-16516, SPR-16539 + void writePojoWithCustomEncoding() { + MockServerHttpResponse outputMessage = new MockServerHttpResponse(bufferFactory); Flux source = Flux.just(new Pojo("foo\uD834\uDD1E", "bar\uD834\uDD1E")); Charset charset = StandardCharsets.UTF_16LE; MediaType mediaType = new MediaType("text", "event-stream", charset); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java index 58624dd8bce9..1dd0ec9c3b32 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java @@ -28,6 +28,7 @@ import mockwebserver3.MockWebServer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -97,8 +98,8 @@ private ReactorClientHttpConnector initConnector() { } - @ParameterizedDataBufferAllocatingTest - void bodyToMonoVoid(DataBufferFactory bufferFactory) throws IOException { + @Test + void bodyToMonoVoid() throws IOException { setUp(bufferFactory); this.server.enqueue(new MockResponse.Builder(). @@ -116,8 +117,8 @@ void bodyToMonoVoid(DataBufferFactory bufferFactory) throws IOException { assertThat(this.server.getRequestCount()).isEqualTo(1); } - @ParameterizedDataBufferAllocatingTest // SPR-17482 - void bodyToMonoVoidWithoutContentType(DataBufferFactory bufferFactory) throws IOException { + @Test // SPR-17482 + void bodyToMonoVoidWithoutContentType() throws IOException { setUp(bufferFactory); this.server.enqueue(new MockResponse.Builder() @@ -134,40 +135,40 @@ void bodyToMonoVoidWithoutContentType(DataBufferFactory bufferFactory) throws IO assertThat(this.server.getRequestCount()).isEqualTo(1); } - @ParameterizedDataBufferAllocatingTest - void onStatusWithBodyNotConsumed(DataBufferFactory bufferFactory) throws IOException { + @Test + void onStatusWithBodyNotConsumed() throws IOException { setUp(bufferFactory); RuntimeException ex = new RuntimeException("response error"); testOnStatus(ex, response -> Mono.just(ex)); } - @ParameterizedDataBufferAllocatingTest - void onStatusWithBodyConsumed(DataBufferFactory bufferFactory) throws IOException { + @Test + void onStatusWithBodyConsumed() throws IOException { setUp(bufferFactory); RuntimeException ex = new RuntimeException("response error"); testOnStatus(ex, response -> response.bodyToMono(Void.class).thenReturn(ex)); } - @ParameterizedDataBufferAllocatingTest // SPR-17473 - void onStatusWithMonoErrorAndBodyNotConsumed(DataBufferFactory bufferFactory) throws IOException { + @Test // SPR-17473 + void onStatusWithMonoErrorAndBodyNotConsumed() throws IOException { setUp(bufferFactory); RuntimeException ex = new RuntimeException("response error"); testOnStatus(ex, response -> Mono.error(ex)); } - @ParameterizedDataBufferAllocatingTest - void onStatusWithMonoErrorAndBodyConsumed(DataBufferFactory bufferFactory) throws IOException { + @Test + void onStatusWithMonoErrorAndBodyConsumed() throws IOException { setUp(bufferFactory); RuntimeException ex = new RuntimeException("response error"); testOnStatus(ex, response -> response.bodyToMono(Void.class).then(Mono.error(ex))); } - @ParameterizedDataBufferAllocatingTest // gh-23230 - void onStatusWithImmediateErrorAndBodyNotConsumed(DataBufferFactory bufferFactory) throws IOException { + @Test // gh-23230 + void onStatusWithImmediateErrorAndBodyNotConsumed() throws IOException { setUp(bufferFactory); RuntimeException ex = new RuntimeException("response error"); @@ -176,8 +177,8 @@ void onStatusWithImmediateErrorAndBodyNotConsumed(DataBufferFactory bufferFactor }); } - @ParameterizedDataBufferAllocatingTest - void releaseBody(DataBufferFactory bufferFactory) throws IOException { + @Test + void releaseBody() throws IOException { setUp(bufferFactory); this.server.enqueue(new MockResponse.Builder() @@ -194,8 +195,8 @@ void releaseBody(DataBufferFactory bufferFactory) throws IOException { .verify(Duration.ofSeconds(3)); } - @ParameterizedDataBufferAllocatingTest - void exchangeToBodilessEntity(DataBufferFactory bufferFactory) throws IOException { + @Test + void exchangeToBodilessEntity() throws IOException { setUp(bufferFactory); this.server.enqueue(new MockResponse.Builder()