ARTEMIS-3837 Adding support for AMQPLargeMessage.getData()

This commit is contained in:
Clebert Suconic 2022-05-17 19:07:37 -04:00 committed by clebertsuconic
parent 866bbbba4d
commit 9b959c2fec
4 changed files with 71 additions and 27 deletions

View File

@ -57,20 +57,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
@Override
public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
LargeBodyReader reader = largeBody.getLargeBodyReader();
try {
long size = reader.getSize();
if (size > Integer.MAX_VALUE) {
throw new RuntimeException("AMQP Large Message Body is too large to be converted into core");
}
byte[] buffer = new byte[(int)size];
ByteBuffer wrapbuffer = ByteBuffer.wrap(buffer);
reader.open();
reader.readInto(wrapbuffer);
AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, buffer, extraProperties, coreMessageObjectPools);
AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, getData().array(), extraProperties, coreMessageObjectPools);
if (this.getExpiration() > 0) {
standardMessage.reloadExpiration(this.getExpiration());
}
@ -80,16 +69,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
} catch (Exception e) {
logger.warn(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
try {
reader.close();
} catch (Exception e) {
// unexpected to happen, but possible, nothing else we can do beyond logging at this point
// if we wanted to add anything it would be a critical failure but it would be a heavy refactoring
// to bring the bits and listeners here for little benefit
// the possibility of this happening involves losing the storage device which will lead to other errors anyway
logger.warn(e.getMessage(), e);
}
}
}
@ -308,7 +287,28 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
@Override
public ReadableBuffer getData() {
throw new UnsupportedOperationException("Method not supported with Large Messages");
LargeBodyReader reader = largeBody.getLargeBodyReader();
try {
long size = reader.getSize();
if (size > Integer.MAX_VALUE) {
throw new RuntimeException("AMQP Large Message Body is too large to be read into memory");
}
byte[] buffer = new byte[(int) size];
ByteBuffer wrapbuffer = ByteBuffer.wrap(buffer);
reader.open();
reader.readInto(wrapbuffer);
return new ReadableBuffer.ByteBufferReader(wrapbuffer.rewind());
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
try {
reader.close();
} catch (Exception ignored) {
logger.debug(ignored.getMessage(), ignored);
}
}
}
public void parseHeader(ReadableBuffer buffer) {

View File

@ -1355,7 +1355,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public boolean hasScheduledDeliveryTime() {
if (scheduledTime >= 0) {
return true;
return scheduledTime > 0;
}
return anyMessageAnnotations(SCHEDULED_DELIVERY_SYMBOLS, SCHEDULED_DELIVERY_NEEDLES);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@ -41,11 +42,14 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -59,6 +63,7 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Assume;
@ -141,6 +146,47 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 60000)
public void testSendAndGetData() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
int nMsgs = 1;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
sendMessages(nMsgs, connection);
int count = getMessageCount(server.getPostOffice(), testQueueName);
assertEquals(nMsgs, count);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(testQueueName);
serverQueue.forEach(ref -> {
try {
AMQPLargeMessage message = (AMQPLargeMessage) ref.getMessage();
Assert.assertFalse(message.hasScheduledDeliveryTime());
ReadableBuffer dataBuffer = message.getData();
LargeBodyReader reader = message.getLargeBodyReader();
try {
Assert.assertEquals(reader.getSize(), dataBuffer.remaining());
reader.open();
ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.remaining());
reader.readInto(buffer);
ByteUtil.equals(buffer.array(), dataBuffer.array());
} finally {
reader.close();
}
} catch (AssertionError assertionError) {
throw assertionError;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
}
});
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));

View File

@ -94,8 +94,6 @@ public class MessageRedistributionTest extends ClusterTestBase {
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
this.
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);