diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index b921574af3..8c63bf9c4d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -57,20 +57,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage @Override public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { - LargeBodyReader reader = largeBody.getLargeBodyReader(); try { - long size = reader.getSize(); - if (size > Integer.MAX_VALUE) { - throw new RuntimeException("AMQP Large Message Body is too large to be converted into core"); - } - byte[] buffer = new byte[(int)size]; - ByteBuffer wrapbuffer = ByteBuffer.wrap(buffer); - - reader.open(); - reader.readInto(wrapbuffer); - - AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, buffer, extraProperties, coreMessageObjectPools); + AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, getData().array(), extraProperties, coreMessageObjectPools); if (this.getExpiration() > 0) { standardMessage.reloadExpiration(this.getExpiration()); } @@ -80,16 +69,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } catch (Exception e) { logger.warn(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); - } finally { - try { - reader.close(); - } catch (Exception e) { - // unexpected to happen, but possible, nothing else we can do beyond logging at this point - // if we wanted to add anything it would be a critical failure but it would be a heavy refactoring - // to bring the bits and listeners here for little benefit - // the possibility of this happening involves losing the storage device which will lead to other errors anyway - logger.warn(e.getMessage(), e); - } } } @@ -308,7 +287,28 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage @Override public ReadableBuffer getData() { - throw new UnsupportedOperationException("Method not supported with Large Messages"); + LargeBodyReader reader = largeBody.getLargeBodyReader(); + + try { + long size = reader.getSize(); + if (size > Integer.MAX_VALUE) { + throw new RuntimeException("AMQP Large Message Body is too large to be read into memory"); + } + byte[] buffer = new byte[(int) size]; + ByteBuffer wrapbuffer = ByteBuffer.wrap(buffer); + + reader.open(); + reader.readInto(wrapbuffer); + return new ReadableBuffer.ByteBufferReader(wrapbuffer.rewind()); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + try { + reader.close(); + } catch (Exception ignored) { + logger.debug(ignored.getMessage(), ignored); + } + } } public void parseHeader(ReadableBuffer buffer) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 2024bc13b3..c707bf8b3f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1355,7 +1355,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public boolean hasScheduledDeliveryTime() { if (scheduledTime >= 0) { - return true; + return scheduledTime > 0; } return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS, SCHEDULED_DELIVERY_NEEDLES); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index b22faa9562..a610d87e8f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -41,11 +42,14 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.message.LargeBodyReader; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -59,6 +63,7 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Assume; @@ -141,6 +146,47 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } + @Test(timeout = 60000) + public void testSendAndGetData() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int nMsgs = 1; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + sendMessages(nMsgs, connection); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(testQueueName); + serverQueue.forEach(ref -> { + try { + AMQPLargeMessage message = (AMQPLargeMessage) ref.getMessage(); + Assert.assertFalse(message.hasScheduledDeliveryTime()); + ReadableBuffer dataBuffer = message.getData(); + LargeBodyReader reader = message.getLargeBodyReader(); + try { + Assert.assertEquals(reader.getSize(), dataBuffer.remaining()); + reader.open(); + ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.remaining()); + reader.readInto(buffer); + ByteUtil.equals(buffer.array(), dataBuffer.array()); + } finally { + reader.close(); + } + } catch (AssertionError assertionError) { + throw assertionError; + } catch (Throwable e) { + throw new RuntimeException(e.getMessage(), e); + } + + }); + } finally { + connection.close(); + } + } + @Test(timeout = 60000) public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 2c7b92a206..54d3ab8367 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -94,9 +94,7 @@ public class MessageRedistributionTest extends ClusterTestBase { setupSessionFactory(0, isNetty()); setupSessionFactory(1, isNetty()); - this. - - createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(0, "queues.testaddress", "queue0", null, false); createQueue(1, "queues.testaddress", "queue0", null, false); addConsumer(1, 1, "queue0", null);