Skip to content

ARTEMIS-5530 Some handling of compressed messages can throw NegativeA… #5764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,6 @@ private void handleCompressedMessage(final ClientMessageInternal clMessage) thro
qbuff.readBytes(body);
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
currentLargeMessageController.addPacket(body, body.length, false);
largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);

handleRegularMessage(largeMessage);
}
Expand Down Expand Up @@ -681,7 +680,6 @@ public synchronized void handleLargeMessage(final ClientLargeMessageInternal cli

if (clientLargeMessage.isCompressed()) {
clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
} else {
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ private void checkBuffer() throws ActiveMQException {
writableBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer.duplicate(), this);

largeMessageController.saveBuffer(new ActiveMQOutputStream(writableBuffer));

unsetCompressionPropertyIfNeeded();
}
}

private void unsetCompressionPropertyIfNeeded() {
if (largeMessageController instanceof CompressedLargeMessageControllerImpl) {
putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,76 @@ public void testPreviouslyCompressedMessageCleanup() throws Exception {
locator2.close();
}

@TestTemplate
public void testCompressedMessageRouting() throws Exception {
SimpleString DATA = SimpleString.of(RandomUtil.randomAlphaNumericString(1024 * 1024));

ActiveMQServer server = createServer(true, isNetty());
server.start();

server.createQueue(QueueConfiguration.of(ADDRESS).setRoutingType(RoutingType.ANYCAST));

locator.setAckBatchSize(0);

ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true);
ClientProducer producer = session.createProducer(ADDRESS);
ClientConsumer consumer = session.createConsumer(ADDRESS);

ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeNullableSimpleString(DATA);
producer.send(message);

session.start();
message = consumer.receive(2000);
assertNotNull(message);
assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
message.checkCompletion();
message.acknowledge();

ServerLocator locator2 = createFactory(isNetty());
ServerLocator locator3 = createFactory(isNetty());
locator2.setMinLargeMessageSize(10240);
//Any sufficiently large value here causes a "java.lang.NegativeArraySizeException"
locator3.setMinLargeMessageSize(1024000);

ClientSessionFactory sf2 = locator2.createSessionFactory();
ClientSessionFactory sf3 = locator3.createSessionFactory();
ClientSession session2 = sf2.createSession(true, true);
ClientSession session3 = sf3.createSession(true, true);
ClientProducer producer2 = session2.createProducer(ADDRESS);
ClientProducer producer3 = session3.createProducer(ADDRESS);
ClientMessage receivedMessage;

System.out.println("Send");

//Notice the _AMQ_LARGE_SIZE value changing part way through
for (int i = 0; i < 3; i++) {
System.out.println(message);
producer.send(message);
System.out.println(message);
producer2.send(message);
System.out.println(message);
producer3.send(message);
}

System.out.println("Receive");

for (int i = 0; i < 9; i++) {
receivedMessage = consumer.receive(2000);
assertNotNull(receivedMessage);
System.out.println(receivedMessage);
assertEquals(DATA, receivedMessage.getBodyBuffer().readNullableSimpleString());
receivedMessage.acknowledge();
}

consumer.close();

}

@TestTemplate
public void testLargeMessageCompressionLevel() throws Exception {
SimpleString DATA = SimpleString.of(RandomUtil.randomAlphaNumericString(1024 * 1024));

SimpleString address1 = SimpleString.of("address1");
SimpleString address2 = SimpleString.of("address2");
Expand Down Expand Up @@ -602,16 +670,16 @@ public void testLargeMessageCompressionLevel() throws Exception {
session2.createQueue(QueueConfiguration.of(address2));
session3.createQueue(QueueConfiguration.of(address3));

String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
for (int i = 0; i < 20; i++) {
inputString = inputString + inputString;
}
ClientMessage message1 = session1.createMessage(true);
ClientMessage message2 = session2.createMessage(true);
ClientMessage message3 = session3.createMessage(true);
message1.getBodyBuffer().writeNullableSimpleString(DATA);
message2.getBodyBuffer().writeNullableSimpleString(DATA);
message3.getBodyBuffer().writeNullableSimpleString(DATA);

ClientMessage message = session1.createMessage(true);
message.getBodyBuffer().writeString(inputString);
producer1.send(message);
producer2.send(message);
producer3.send(message);
producer1.send(message1);
producer2.send(message2);
producer3.send(message3);

QueueControl queueControl1 = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + address1);
Expand All @@ -623,10 +691,34 @@ public void testLargeMessageCompressionLevel() throws Exception {
assertEquals(1, queueControl1.getMessageCount());
assertEquals(1, queueControl2.getMessageCount());
assertEquals(1, queueControl3.getMessageCount());
assertTrue(message.getPersistentSize() > queueControl1.getPersistentSize());

assertTrue(message1.getPersistentSize() > queueControl1.getPersistentSize());
assertTrue(queueControl1.getPersistentSize() > queueControl2.getPersistentSize());
assertTrue(queueControl2.getPersistentSize() > queueControl3.getPersistentSize());

ClientConsumer consumer1 = session1.createConsumer(address1);
ClientConsumer consumer2 = session2.createConsumer(address2);
ClientConsumer consumer3 = session3.createConsumer(address3);
session1.start();
session2.start();
session3.start();

ClientMessage message;
message = consumer1.receive(2000);
assertNotNull(message);
assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
message.acknowledge();

message = consumer2.receive(2000);
assertNotNull(message);
assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
message.acknowledge();

message = consumer3.receive(2000);
assertNotNull(message);
assertEquals(DATA, message.getBodyBuffer().readNullableSimpleString());
message.acknowledge();

sf1.close();
sf2.close();
sf3.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.artemis.api.core.RoutingType;
Expand All @@ -37,9 +38,11 @@
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -647,6 +650,50 @@ public void testFederatedAddressChainOfBrokers() throws Exception {
}
}

@Test
public void testUpstreamFederatedAddressWithCompressedMessage() throws Exception {
final String DATA = RandomUtil.randomAlphaNumericString(1024 * 1024);
final String address = getName();

FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();

FederationConfiguration federationConfiguration2 = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server0", address);
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration2);
getServer(1).getFederationManager().deploy();

ActiveMQConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://" + 0);
ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://" + 1);
cf0.setCompressLargeMessage(true);
cf1.setCompressLargeMessage(true);

try (Connection connection1 = cf1.createConnection();
Connection connection0 = cf0.createConnection()) {
connection1.start();
connection0.start();

Session session0 = connection0.createSession();
Session session1 = connection1.createSession();
Topic topic0 = session0.createTopic(address);
Topic topic1 = session1.createTopic(address);
MessageConsumer consumer0 = session0.createConsumer(topic0);
MessageConsumer consumer1 = session1.createConsumer(topic1);

MessageProducer producer = session0.createProducer(topic0);
producer.send(session0.createTextMessage(DATA));

TextMessage message0 = (TextMessage) consumer0.receive(1000);
assertNotNull(message0);
assertEquals(DATA, message0.getText());

TextMessage message1 = (TextMessage) consumer1.receive(1000);
assertNotNull(message1);
assertEquals(DATA, message1.getText());

}
}

private Message createTextMessage(Session session1, String group) throws JMSException {
Message message = session1.createTextMessage("hello");
message.setStringProperty("JMSXGroupID", group);
Expand Down