ARTEMIS-3833 Preserve JMSCorrelationID of distributed AMQP large messages

This commit is contained in:
Domenico Francesco Bruscino 2022-05-16 08:57:24 +02:00 committed by clebertsuconic
parent f58db5a054
commit f632e8104b
2 changed files with 38 additions and 3 deletions

View File

@ -245,13 +245,17 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
this.headerPosition = copy.headerPosition;
this.encodedHeaderSize = copy.encodedHeaderSize;
this.header = copy.header == null ? null : new Header(copy.header);
this.deliveryAnnotationsPosition = copy.deliveryAnnotationsPosition;
this.encodedDeliveryAnnotationsSize = copy.encodedDeliveryAnnotationsSize;
this.deliveryAnnotations = copy.deliveryAnnotations == null ? null : new DeliveryAnnotations(copy.deliveryAnnotations.getValue());
this.messageAnnotationsPosition = copy.messageAnnotationsPosition;
this.messageAnnotations = copy.messageAnnotations == null ? null : new MessageAnnotations(copy.messageAnnotations.getValue());
this.propertiesPosition = copy.propertiesPosition;
this.properties = copy.properties == null ? null : new Properties(copy.properties);
this.applicationPropertiesPosition = copy.applicationPropertiesPosition;
this.applicationProperties = copy.applicationProperties == null ? null : new ApplicationProperties(copy.applicationProperties.getValue());
this.remainingBodyPosition = copy.remainingBodyPosition;
this.applicationProperties = copy.applicationProperties;
this.messageDataScanned = copy.messageDataScanned;
}

View File

@ -21,12 +21,14 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -80,6 +82,29 @@ public class AMQPLargeMessageClusterTest extends ClusterTestBase {
@Test(timeout = RECEIVE_TIMEOUT_MILLIS * (MESSAGES + 1))
public void testSendReceiveLargeMessage() throws Exception {
testSendReceiveLargeMessage(message -> { }, message -> { });
}
@Test(timeout = RECEIVE_TIMEOUT_MILLIS * (MESSAGES + 1))
public void testSendReceiveLargeMessageWithJMSCorrelationID() throws Exception {
final String jmsCorrelationID = "123456";
testSendReceiveLargeMessage(message -> {
try {
message.setJMSCorrelationID(jmsCorrelationID);
} catch (JMSException e) {
fail("Exception not expected: " + e);
}
}, message -> {
try {
Assert.assertEquals(jmsCorrelationID, message.getJMSCorrelationID());
} catch (JMSException e) {
fail("Exception not expected: " + e);
}
});
}
private void testSendReceiveLargeMessage(Consumer<Message> beforeSending, Consumer<Message> afterReceiving) throws Exception {
setupCluster(MessageLoadBalancingType.ON_DEMAND);
startServers(0, 1);
@ -99,12 +124,16 @@ public class AMQPLargeMessageClusterTest extends ClusterTestBase {
String producerUri = "amqp://localhost:61616";
final JmsConnectionFactory producerFactory = new JmsConnectionFactory(producerUri);
try (Connection producerConnection = producerFactory.createConnection(); Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (Connection producerConnection = producerFactory.createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
producerConnection.start();
final Destination queue = producerSession.createQueue(queueName);
String consumerUri = "amqp://localhost:61617";
final JmsConnectionFactory consumerConnectionFactory = new JmsConnectionFactory(consumerUri);
try (Connection consumerConnection = consumerConnectionFactory.createConnection(); Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = producerSession.createProducer(queue)) {
try (Connection consumerConnection = consumerConnectionFactory.createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = producerSession.createProducer(queue)) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
consumerConnection.start();
final byte[] largeMessageContent = new byte[MESSAGE_SIZE];
@ -113,6 +142,7 @@ public class AMQPLargeMessageClusterTest extends ClusterTestBase {
for (int i = 0; i < MESSAGES; i++) {
final BytesMessage sentMessage = producerSession.createBytesMessage();
sentMessage.writeBytes(largeMessageContent);
beforeSending.accept(sentMessage);
producer.send(sentMessage);
final Message receivedMessage = consumer.receive(RECEIVE_TIMEOUT_MILLIS);
Assert.assertNotNull("A message should be received in " + RECEIVE_TIMEOUT_MILLIS + " ms", receivedMessage);
@ -124,6 +154,7 @@ public class AMQPLargeMessageClusterTest extends ClusterTestBase {
e.printStackTrace();
System.exit(-1);
}
afterReceiving.accept(receivedMessage);
}
}
}