diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 724c9695bc..6de092d432 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -1263,6 +1263,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal txID, id, recordType, record); } + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); + int encodeSize = addRecord.getEncodeSize(); + + if (encodeSize > getMaxRecordSize()) { + //The record size should be larger than max record size only on the large messages case. + throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize, getMaxRecordSize()); + } appendExecutor.execute(new Runnable() { @@ -1276,9 +1283,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal if (tx != null) { tx.checkErrorCondition(); } - JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); // we need to calculate the encodeSize here, as it may use caches that are eliminated once the record is written - int encodeSize = addRecord.getEncodeSize(); JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); if (logger.isTraceEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 5720c49820..6f7fb3c897 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -77,6 +77,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueQueryResult; @@ -1914,7 +1915,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener { result = handleManagementMessage(tx, message, direct); } else { - result = doSend(tx, message, address, direct, senderName, noAutoCreateQueue, routingContext); + try { + result = doSend(tx, message, address, direct, senderName, noAutoCreateQueue, routingContext); + } catch (ActiveMQIOErrorException e) { + if (tx != null) { + tx.markAsRollbackOnly(e); + } + if (message.isLargeMessage()) { + ((LargeServerMessage)message).deleteFile(); + } + throw e; + } } if (AuditLogger.isMessageLoggingEnabled()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index 94bb09c7fa..b9e6aa863c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -41,6 +42,7 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.message.LargeBodyReader; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -49,6 +51,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage; import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -521,6 +524,81 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } + @Test + public void testLargeHeaderTXLargeBody() throws Exception { + Assume.assumeFalse(jdbc); // the checked rule with the property size will not be applied to JDBC, hence we skip the test + testLargeHeaderTX(true); + } + + @Test + public void testLargeHeaderTXSmallBody() throws Exception { + Assume.assumeFalse(jdbc); // the checked rule with the property size will not be applied to JDBC, hence we skip the test + testLargeHeaderTX(false); + } + + private void testLargeHeaderTX(boolean largeBody) throws Exception { + String testQueueName = RandomUtil.randomString(); + server.createQueue(new QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST)); + ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672"); + + String largeString; + { + StringBuffer buffer = new StringBuffer(); + while (buffer.length() < 1024 * 1024) { + buffer.append("This is a large string "); + } + largeString = buffer.toString(); + } + + String smallString = "small string"; + + String body = largeBody ? largeString : smallString; + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(testQueueName)); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + Message message = session.createTextMessage(body); + message.setStringProperty("test", largeString); + boolean failed = false; + try { + producer.send(message); + session.commit(); + } catch (Exception expected) { + failed = true; + } + Assert.assertTrue(failed); + } + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(testQueueName)); + + Message message = session.createTextMessage(body); + message.setStringProperty("test", smallString); + producer.send(message); + session.commit(); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(session.createQueue(testQueueName)); + TextMessage recMessage = (TextMessage) consumer.receive(5000); + Assert.assertEquals(smallString, recMessage.getStringProperty("test")); + Assert.assertEquals(body, recMessage.getText()); + session.commit(); + + Assert.assertNull(consumer.receiveNoWait()); + } + + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(testQueueName); + Wait.assertEquals(0, serverQueue::getMessageCount); + + File largeMessageFolder = server.getConfiguration().getLargeMessagesLocation(); + File[] files = largeMessageFolder.listFiles(); + Assert.assertTrue(files == null ? "Null Files" : "There are " + files.length + " files in the large message folder", files == null || files.length == 0); + } + + @Test(timeout = 60000) public void testSendSmallerMessages() throws Exception { for (int i = 512; i <= (8 * 1024); i += 512) {