Skip to content
This repository was archived by the owner on Jun 29, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,19 @@
</executions>
</plugin>

<plugin>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for this change? maven-source-plugin is pulled in through the parent pom and executed on release builds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is no reason to this change. May be ignored.

<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class GelfTCPSender extends AbstractNioSender<SocketChannel> implements G
private final int deliveryAttempts;

private final Object connectLock = new Object();
private final Object writeLock = new Object();

private final ThreadLocal<ByteBuffer> writeBuffers = new ThreadLocal<ByteBuffer>() {
@Override
Expand Down Expand Up @@ -95,26 +96,32 @@ public boolean sendMessage(GelfMessage message) {

IOException exception = null;

for (int i = 0; i < deliveryAttempts; i++) {
try {
synchronized (writeLock) {
for (int i = 0; i < deliveryAttempts; i++) {
try {

// (re)-connect if necessary
if (!isConnected()) {
synchronized (connectLock) {
connect();
// (re)-connect if necessary
if (!isConnected()) {
synchronized (connectLock) {
connect();
}
}
}

if (BUFFER_SIZE == 0) {
channel().write(message.toTCPBuffer());
} else {
channel().write(message.toTCPBuffer(getByteBuffer()));
}
ByteBuffer byteBuffer;
if (BUFFER_SIZE == 0) {
byteBuffer = message.toTCPBuffer();
} else {
byteBuffer = message.toTCPBuffer(getByteBuffer());
}
while(byteBuffer.hasRemaining()) {
channel().write(byteBuffer);
}

return true;
} catch (IOException e) {
Closer.close(channel());
exception = e;
return true;
} catch (IOException e) {
Closer.close(channel());
exception = e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.io.InputStream;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -131,6 +140,124 @@ public void shouldSendDataToOpenPort() throws Exception {
spy.close();
}

@Test
public void socketSendBufferOverflowTest() throws Exception {

final int TEST_DURATION_MILLIS = 3000;

int port = randomPort();

// some payload to increase size of GelfMessage
final GelfMessage gelfMessage = new GelfMessage("" +
"----------------------------------------------------------------------------------------------------" +
"----------------------------------------------------------------------------------------------------" +
"----------------------------------------------------------------------------------------------------" +
"----------------------------------------------------------------------------------------------------" +
"----------------------------------------------------------------------------------------------------" +
"", "" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"====================================================================================================" +
"", 1000, "info");
gelfMessage.setHost("host");

final ByteBuffer referenceMessage = gelfMessage.toTCPBuffer();
final byte[] referenceMessageArray = referenceMessage.array();

final AtomicInteger readBytesCount = new AtomicInteger();
final AtomicLong lastReadTs = new AtomicLong();
final AtomicBoolean running = new AtomicBoolean(true);

final long startTs = System.currentTimeMillis();

// local server, reads all data and checks gelf messages stream integrity
final ServerSocket listener = new ServerSocket(port);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
byte[] arr = new byte[1024 * 1024];
try {
Socket socket = listener.accept();
InputStream inputStream = socket.getInputStream();

int currIdx = 0;
int len;
while ((len = inputStream.read(arr)) != -1) {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis > startTs + TEST_DURATION_MILLIS) {
running.set(false);
}

lastReadTs.set(currentTimeMillis);
readBytesCount.addAndGet(len);

// check GelfMessages integrity, stream of similar gelf messages
for (int i = 0; i < len; i++) {
if (referenceMessageArray[currIdx] != arr[i]) {
running.set(false);
Assert.fail("inconsistent message: '" + new String(arr, Math.max(0, i - 20),
Math.min(40, len)) + "'");
}

if (++currIdx == referenceMessageArray.length) {
currIdx = 0;
}
}
// message processing delay
Thread.sleep(10);
}
} catch (IOException e) {
} catch (InterruptedException e) {
}
}
});
thread.start();

// log senders
final GelfTCPSender tcpSender = new GelfTCPSender("127.0.0.1", port, 1000, 1000, errorReporter);

final AtomicInteger sendMessagesCount = new AtomicInteger();
Runnable runnable = new Runnable() {
@Override
public void run() {
while (running.get()) {
tcpSender.sendMessage(gelfMessage);
sendMessagesCount.incrementAndGet();
}
}
};
int writersCount = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
ExecutorService executorService = Executors.newFixedThreadPool(writersCount);
List<Future> futures = new ArrayList<Future>();
for (int i = 0; i < writersCount; i++) {
futures.add(executorService.submit(runnable));
}

// waiting writer tasks to complete
for (Future future : futures) {
future.get();
}
tcpSender.close();
executorService.shutdown();

// waiting listener completes reading, 1 sec idle state
while (System.currentTimeMillis() - lastReadTs.get() < 1000) {
Thread.sleep(100);
}

listener.close();

int msgLen = referenceMessage.capacity();
Assert.assertEquals(sendMessagesCount.get() * msgLen, readBytesCount.get());
}

@Test
public void shouldSendDataToClosedPort() throws Exception {

Expand Down