This commit is contained in:
Clebert Suconic 2019-06-10 17:36:19 -04:00
commit a652ec251f
4 changed files with 70 additions and 2 deletions

View File

@ -832,6 +832,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
", record = " + record);
}
final long maxRecordSize = getMaxRecordSize();
final JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
final int addRecordEncodeSize = addRecord.getEncodeSize();
if (addRecordEncodeSize > maxRecordSize) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@ -839,9 +847,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void run() {
journalLock.readLock().lock();
try {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
records.put(id, new JournalRecord(usedFile, addRecordEncodeSize));
if (logger.isTraceEnabled()) {
logger.trace("appendAddRecord::id=" + id +

View File

@ -47,4 +47,7 @@ public interface ActiveMQJournalBundle {
@Message(id = 149004, value = "unable to open file")
String unableToOpenFile();
@Message(id = 149005, value = "Message of {0} bytes is bigger than the max record size of {1} bytes. You should try to move large application properties to the message body.", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIOErrorException recordLargerThanStoreMax(long recordSize, long maxRecordSize);
}

View File

@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@ -548,6 +549,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
largeMessage.setMessageID(id);
// Check durable large massage size before to allocate resources if it can't be stored
if (largeMessage.isDurable()) {
final long maxRecordSize = getMaxRecordSize();
final int messageEncodeSize = largeMessage.getEncodeSize();
if (messageEncodeSize > maxRecordSize) {
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, maxRecordSize);
}
}
// We do this here to avoid a case where the replication gets a list without this file
// to avoid a race
largeMessage.validateFile();

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@ -265,6 +266,51 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
return payload;
}
@Test(timeout = 60000)
public void testSendHugeHeader() throws Exception {
doTestSendHugeHeader(PAYLOAD);
}
@Test(timeout = 60000)
public void testSendLargeMessageWithHugeHeader() throws Exception {
doTestSendHugeHeader(1024 * 1024);
}
public void doTestSendHugeHeader(int expectedSize) throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
connection.connect();
final int strLength = 1024 * 1024;
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', expectedSize);
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < strLength; i++) {
buffer.append(" ");
}
message.setApplicationProperty("str", buffer.toString());
message.setDurable(true);
try {
sender.send(message);
fail();
} catch (IOException e) {
Assert.assertTrue(e.getCause() instanceof JMSException);
Assert.assertTrue(e.getMessage().contains("AMQ149005"));
}
session.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendSmallerMessages() throws Exception {
for (int i = 512; i <= (8 * 1024); i += 512) {