diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index dfbf1ba647..862bdbeb2d 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -120,6 +120,11 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM return false; } + @Override + public long getBufferSize() { + return dbDriver.getMaxSize(); + } + @Override public synchronized void start() { try { diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index e6a8b998e0..f6922bdfdf 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -127,6 +127,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void flush() throws Exception { } + /** + * The max size record that can be stored in the journal + * + * @return + */ + @Override + public long getMaxRecordSize() { + return sqlProvider.getMaxBlobSize(); + } + @Override protected void createSchema() throws SQLException { createTable(sqlProvider.getCreateJournalTableSQL()); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java index c8277e3b39..f40a6c408a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -101,4 +101,6 @@ public interface SequentialFileFactory { SequentialFileFactory setDatasync(boolean enabled); boolean isDatasync(); + + long getBufferSize(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index df71c160d9..2b81c5989c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -269,6 +269,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor } } + @Override + public long getBufferSize() { + return bufferSize; + } + /** * The same callback is used for Runnable executor. * This way we can save some memory over the pool. diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index a05d3229e7..2c7fd3e3c1 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -41,6 +41,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory private boolean bufferPooling; //pools only the biggest one -> optimized for the common case private final ThreadLocal bytesPool; + private final int bufferSize; private MappedSequentialFileFactory(File directory, int capacity, @@ -57,6 +58,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory } else { timedBuffer = null; } + this.bufferSize = bufferSize; this.bufferPooling = true; this.bytesPool = new ThreadLocal<>(); } @@ -105,6 +107,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory return useDataSync; } + @Override + public long getBufferSize() { + return bufferSize; + } + @Override public int getMaxIO() { return 1; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index 781176e1f2..a814ea0bac 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -203,4 +203,9 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor return bytes; } + @Override + public long getBufferSize() { + return bufferSize; + } + } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 5c03611f21..3da17feb7b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -279,4 +279,10 @@ public interface Journal extends ActiveMQComponent { * It will make sure there are no more pending operations on the Executors. * */ void flush() throws Exception; + + /** + * The max size record that can be stored in the journal + * @return + */ + long getMaxRecordSize(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 9dafd4b049..73540676e0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -103,6 +103,16 @@ public final class FileWrapperJournal extends JournalBase { public void flush() throws Exception { } + /** + * The max size record that can be stored in the journal + * + * @return + */ + @Override + public long getMaxRecordSize() { + return journal.getMaxRecordSize(); + } + /** * Write the record to the current file. */ diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 7b6f48dd42..3043b9770e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2200,6 +2200,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal flushExecutor(compactorExecutor); } + /** + * The max size record that can be stored in the journal + * + * @return + */ + @Override + public long getMaxRecordSize() { + return Math.min(getFileSize(), fileFactory.getBufferSize()); + } + private void flushExecutor(Executor executor) throws InterruptedException { if (executor != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index 3cfc684662..83112d5a33 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -548,4 +548,9 @@ public class ReplicatedJournal implements Journal { public void replicationSyncFinished() { throw new UnsupportedOperationException("should never get called"); } + + @Override + public long getMaxRecordSize() { + return localJournal.getMaxRecordSize(); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index a014c12455..d4bbe76c9f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.artemis.core.server; +import javax.json.JsonArrayBuilder; +import javax.transaction.xa.Xid; import java.util.List; import java.util.Map; import java.util.Set; -import javax.json.JsonArrayBuilder; -import javax.transaction.xa.Xid; - import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index fd96415592..781334192c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.core.server.impl; -import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; - +import javax.json.JsonArrayBuilder; +import javax.json.JsonObjectBuilder; +import javax.transaction.xa.XAException; +import javax.transaction.xa.Xid; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,16 +31,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import javax.json.JsonArrayBuilder; -import javax.json.JsonObjectBuilder; -import javax.transaction.xa.XAException; -import javax.transaction.xa.Xid; - import org.apache.activemq.artemis.Closeable; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; @@ -69,6 +68,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueQueryResult; @@ -93,6 +93,8 @@ import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe; + /** * Server side Session implementation */ @@ -1309,12 +1311,34 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return send(getCurrentTransaction(), message, direct, noAutoCreateQueue); } + + private LargeServerMessage messageToLargeMessage(Message message) throws Exception { + ICoreMessage coreMessage = message.toCore(); + LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage); + + ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); + byte[] body = new byte[buffer.readableBytes()]; + buffer.readBytes(body); + lsm.addBytes(body); + lsm.releaseResources(); + lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, body.length); + return lsm; + } + + @Override public synchronized RoutingStatus send(Transaction tx, - final Message message, + Message msg, final boolean direct, boolean noAutoCreateQueue) throws Exception { + final Message message; + if ((msg.getEncodeSize() > storageManager.getMessageJournal().getMaxRecordSize()) && !msg.isLargeMessage()) { + message = messageToLargeMessage(msg); + } else { + message = msg; + } + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue) : null); // If the protocol doesn't support flow control, we have no choice other than fail the communication diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java new file mode 100644 index 0000000000..35cea1befd --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.junit.Before; +import org.junit.Test; + +public class OpenWireLargeMessageTest extends BasicOpenWireTest { + + public OpenWireLargeMessageTest() { + super(); + } + + public SimpleString lmAddress = new SimpleString("LargeMessageAddress"); + + @Override + @Before + public void setUp() throws Exception { + this.realStore = true; + super.setUp(); + server.createQueue(lmAddress, RoutingType.ANYCAST, lmAddress, null, true, false); + } + + @Test + public void testSendLargeMessage() throws Exception { + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(lmAddress.toString()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + // Create 1MB Message + int size = 1024 * 1024; + byte[] bytes = new byte[size]; + BytesMessage message = session.createBytesMessage(); + message.writeBytes(bytes); + producer.send(message); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 05f3730e3d..d35174b30a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -712,6 +713,11 @@ public final class ReplicationTest extends ActiveMQTestBase { } + @Override + public long getMaxRecordSize() { + return ActiveMQDefaultConfiguration.getDefaultJournalBufferSizeAio(); + } + @Override public void appendCommitRecord(final long txID, final boolean sync) throws Exception { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java index 192454e839..e05b5d0962 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -69,6 +70,11 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { return false; } + @Override + public long getBufferSize() { + return ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO; + } + @Override public int getMaxIO() { return 1;