ARTEMIS-2294 dupe detection for AMQP same as core

This commit is contained in:
Justin Bertram 2019-04-04 11:21:08 -05:00 committed by Clebert Suconic
parent 8c2d89183e
commit 2dd0671698
3 changed files with 48 additions and 1 deletions

View File

@ -971,7 +971,7 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public Object getDuplicateProperty() { public Object getDuplicateProperty() {
return null; return getObjectProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID);
} }
@Override @Override

View File

@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@ -182,6 +183,28 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Test(timeout = 60000)
public void testDuplicateDetection() throws Exception {
final int MSG_COUNT = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName(), true);
for (int i = 1; i <= MSG_COUNT; ++i) {
AmqpMessage message = new AmqpMessage();
message.setApplicationProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
sender.send(message);
}
Wait.assertTrue("Only 1 message should arrive", () -> getProxyToQueue(getQueueName()).getMessageCount() == 1);
sender.close();
connection.close();
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSenderCreditReplenishment() throws Exception { public void testSenderCreditReplenishment() throws Exception {
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();

View File

@ -30,6 +30,7 @@ import javax.jms.TemporaryQueue;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -96,6 +97,29 @@ public class JMSMessageProducerTest extends JMSClientTestSupport {
} }
} }
@Test(timeout = 30000)
public void testDuplicateDetection() throws Exception {
final int MSG_COUNT = 10;
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer p = session.createProducer(null);
for (int i = 1; i <= MSG_COUNT; ++i) {
TextMessage message = session.createTextMessage();
message.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
// this will auto-create the address
p.send(queue, message);
}
Wait.assertTrue("Only 1 message should arrive", () -> getProxyToQueue(getQueueName()).getMessageCount() == 1);
} finally {
connection.close();
}
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testAnonymousProducerAcrossManyDestinations() throws Exception { public void testAnonymousProducerAcrossManyDestinations() throws Exception {
Connection connection = createConnection(); Connection connection = createConnection();