1
0
mirror of https://github.com/apache/activemq-artemis.git synced 2025-02-20 08:55:18 +00:00
This commit is contained in:
Clebert Suconic 2019-03-13 15:03:52 -04:00
commit 3a3be57fd3
3 changed files with 65 additions and 2 deletions
artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal
tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl

@ -244,7 +244,7 @@ public final class TimedBuffer extends CriticalComponentImpl {
}
if (sizeChecked > bufferSize) {
throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + ") on the journal");
throw new IllegalStateException("Can't write records (size=" + sizeChecked + ") bigger than the bufferSize(" + bufferSize + ") on the journal");
}
if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) {

@ -42,13 +42,17 @@ import io.netty.buffer.Unpooled;
public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage {
// When a message is stored on the journal, it will contain some header and trail on the journal
// we need to take that into consideration if that would fit the Journal TimedBuffer.
private static final int ESTIMATE_RECORD_TRAIL = 512;
/** This will check if a regular message needs to be converted as large message */
public static Message checkLargeMessage(Message message, StorageManager storageManager) throws Exception {
if (message.isLargeMessage()) {
return message; // nothing to be done on this case
}
if (message.getEncodeSize() > storageManager.getMaxRecordSize()) {
if (message.getEncodeSize() + ESTIMATE_RECORD_TRAIL > storageManager.getMaxRecordSize()) {
return asLargeMessage(message, storageManager);
} else {
return message;

@ -24,9 +24,18 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
@ -336,6 +345,56 @@ public class MessageImplTest extends ActiveMQTestBase {
buf.writeBytes(new byte[1024]);
}
@Test
public void testCloseCallBuffer() throws Exception {
SimpleString ADDRESS = new SimpleString("SimpleAddress");
final int messageSize = 1024 * 1024 - 64;
final int journalsize = 10 * 1024 * 1024;
ServerLocator locator = createInVMNonHALocator();
locator.setMinLargeMessageSize(1024 * 1024);
ClientSession session = null;
ConfigurationImpl config = (ConfigurationImpl)createDefaultConfig(false);
config.setJournalFileSize(journalsize).setJournalBufferSize_AIO(1024 * 1024).setJournalBufferSize_NIO(1024 * 1024);
ActiveMQServer server = createServer(true, config);
server.start();
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
session = addClientSession(sf.createSession(false, false, 0));
session.createQueue(ADDRESS, ADDRESS, true);
ClientProducer producer = session.createProducer(ADDRESS);
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientMessage clientFile = session.createMessage(true);
for (int i = 0; i < messageSize; i++) {
clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
}
producer.send(clientFile);
session.commit();
session.start();
ClientMessage msg1 = consumer.receive(1000);
Wait.assertTrue(server::isActive);
assertNotNull(msg1);
}
// Protected -------------------------------------------------------------------------------
// Private ----------------------------------------------------------------------------------