ARTEMIS-4282 Large Header may break the broker

This commit is contained in:
Clebert Suconic 2023-05-17 15:54:03 -04:00 committed by clebertsuconic
parent ec54576323
commit 03afbedfe3
3 changed files with 97 additions and 3 deletions

View File

@ -1263,6 +1263,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
txID, id, recordType, record);
}
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
int encodeSize = addRecord.getEncodeSize();
if (encodeSize > getMaxRecordSize()) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize, getMaxRecordSize());
}
appendExecutor.execute(new Runnable() {
@ -1276,9 +1283,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
if (tx != null) {
tx.checkErrorCondition();
}
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
// we need to calculate the encodeSize here, as it may use caches that are eliminated once the record is written
int encodeSize = addRecord.getEncodeSize();
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
if (logger.isTraceEnabled()) {

View File

@ -77,6 +77,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
@ -1914,7 +1915,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
result = handleManagementMessage(tx, message, direct);
} else {
try {
result = doSend(tx, message, address, direct, senderName, noAutoCreateQueue, routingContext);
} catch (ActiveMQIOErrorException e) {
if (tx != null) {
tx.markAsRollbackOnly(e);
}
if (message.isLargeMessage()) {
((LargeServerMessage)message).deleteFile();
}
throw e;
}
}
if (AuditLogger.isMessageLoggingEnabled()) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -41,6 +42,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -49,6 +51,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -521,6 +524,81 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test
public void testLargeHeaderTXLargeBody() throws Exception {
Assume.assumeFalse(jdbc); // the checked rule with the property size will not be applied to JDBC, hence we skip the test
testLargeHeaderTX(true);
}
@Test
public void testLargeHeaderTXSmallBody() throws Exception {
Assume.assumeFalse(jdbc); // the checked rule with the property size will not be applied to JDBC, hence we skip the test
testLargeHeaderTX(false);
}
private void testLargeHeaderTX(boolean largeBody) throws Exception {
String testQueueName = RandomUtil.randomString();
server.createQueue(new QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
String largeString;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 1024 * 1024) {
buffer.append("This is a large string ");
}
largeString = buffer.toString();
}
String smallString = "small string";
String body = largeBody ? largeString : smallString;
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(testQueueName));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createTextMessage(body);
message.setStringProperty("test", largeString);
boolean failed = false;
try {
producer.send(message);
session.commit();
} catch (Exception expected) {
failed = true;
}
Assert.assertTrue(failed);
}
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(testQueueName));
Message message = session.createTextMessage(body);
message.setStringProperty("test", smallString);
producer.send(message);
session.commit();
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(testQueueName));
TextMessage recMessage = (TextMessage) consumer.receive(5000);
Assert.assertEquals(smallString, recMessage.getStringProperty("test"));
Assert.assertEquals(body, recMessage.getText());
session.commit();
Assert.assertNull(consumer.receiveNoWait());
}
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(testQueueName);
Wait.assertEquals(0, serverQueue::getMessageCount);
File largeMessageFolder = server.getConfiguration().getLargeMessagesLocation();
File[] files = largeMessageFolder.listFiles();
Assert.assertTrue(files == null ? "Null Files" : "There are " + files.length + " files in the large message folder", files == null || files.length == 0);
}
@Test(timeout = 60000)
public void testSendSmallerMessages() throws Exception {
for (int i = 512; i <= (8 * 1024); i += 512) {