diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index a122d7b098..df35115dd9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -971,7 +971,7 @@ public class AMQPMessage extends RefCountMessage { @Override public Object getDuplicateProperty() { - return null; + return getObjectProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java index 50d882825d..dc1b6e4cc5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -182,6 +183,28 @@ public class AmqpSenderTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testDuplicateDetection() throws Exception { + final int MSG_COUNT = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName(), true); + + for (int i = 1; i <= MSG_COUNT; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setApplicationProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123"); + sender.send(message); + } + + Wait.assertTrue("Only 1 message should arrive", () -> getProxyToQueue(getQueueName()).getMessageCount() == 1); + + sender.close(); + connection.close(); + } + @Test(timeout = 60000) public void testSenderCreditReplenishment() throws Exception { AtomicInteger counter = new AtomicInteger(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java index 95f5a4bcbd..2125ed811f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java @@ -30,6 +30,7 @@ import javax.jms.TemporaryQueue; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Test; @@ -96,6 +97,29 @@ public class JMSMessageProducerTest extends JMSClientTestSupport { } } + @Test(timeout = 30000) + public void testDuplicateDetection() throws Exception { + final int MSG_COUNT = 10; + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer p = session.createProducer(null); + + for (int i = 1; i <= MSG_COUNT; ++i) { + TextMessage message = session.createTextMessage(); + message.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123"); + // this will auto-create the address + p.send(queue, message); + } + + Wait.assertTrue("Only 1 message should arrive", () -> getProxyToQueue(getQueueName()).getMessageCount() == 1); + } finally { + connection.close(); + } + } + @Test(timeout = 60000) public void testAnonymousProducerAcrossManyDestinations() throws Exception { Connection connection = createConnection();