diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 11d1a2148c..d10f3b86e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -19,12 +19,13 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -205,9 +206,10 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe public ActiveMQBuffer getReadOnlyBodyBuffer() { try { file.open(); - ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer((int) file.size()); - file.read(buffer.toByteBuffer()); - return buffer; + int fileSize = (int) file.size(); + ByteBuffer buffer = this.storageManager.largeMessagesFactory.newBuffer(fileSize); + file.read(buffer); + return new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer)); } catch (Exception e) { throw new RuntimeException(e); } finally { @@ -215,7 +217,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe file.close(); } catch (Exception ignored) { } - } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java index 35cea1befd..9535cc5797 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.openwire; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -60,4 +61,34 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest { producer.send(message); } } + + @Test + public void testSendReceiveLargeMessage() throws Exception { + try (Connection connection = factory.createConnection()) { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(lmAddress.toString()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + // Create 1MB Message + int size = 1024 * 1024; + byte[] bytes = new byte[size]; + bytes[0] = 1; + + BytesMessage message = session.createBytesMessage(); + message.writeBytes(bytes); + producer.send(message); + + MessageConsumer consumer = session.createConsumer(queue); + BytesMessage m = (BytesMessage) consumer.receive(); + assertNotNull(m); + + byte[] body = new byte[size]; + m.readBytes(body); + + assertArrayEquals(body, bytes); + } + } }