From cf004c205de6eb9b69ba85a3467c3798043c6460 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 1 Sep 2016 16:46:21 +0100 Subject: [PATCH] AMQ-6413 - ensure audit update on skipped store add for kahadb concurrentStoreAndDispatch. Fix and test (cherry picked from commit f8bc19b96da752e216de2c5c543a7d8523512a03) --- .../activemq/store/kahadb/KahaDBStore.java | 6 + .../org/apache/activemq/bugs/AMQ5212Test.java | 110 ++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 0d20b78e33..dee5c408ef 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -421,6 +421,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } removeMessage(context, ack); } else { + indexLock.writeLock().lock(); + try { + metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); + } finally { + indexLock.writeLock().unlock(); + } synchronized (asyncTaskMap) { asyncTaskMap.remove(key); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java index 064a5be456..cc2602d291 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java @@ -20,7 +20,10 @@ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -35,6 +38,8 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.MutableBrokerFilter; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; @@ -224,4 +229,109 @@ public class AMQ5212Test { activeMQConnection.close(); } + + @Test + public void verifyProducerAudit() throws Exception { + + MutableBrokerFilter filter = (MutableBrokerFilter)brokerService.getBroker().getAdaptor(MutableBrokerFilter.class); + filter.setNext(new MutableBrokerFilter(filter.getNext()) { + @Override + public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception { + super.send(producerExchange, messageSend); + Object seq = messageSend.getProperty("seq"); + if (seq instanceof Integer) { + if ( ((Integer) seq).intValue() %200 == 0 && producerExchange.getConnectionContext().getConnection() != null) { + producerExchange.getConnectionContext().setDontSendReponse(true); + producerExchange.getConnectionContext().getConnection().serviceException(new IOException("force reconnect")); + } + } + } + }); + + final AtomicInteger received = new AtomicInteger(0); + final ActiveMQQueue dest = new ActiveMQQueue("Q"); + final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://" + brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + final int numConsumers = 40; + ExecutorService executorService = Executors.newCachedThreadPool(); + final CountDownLatch consumerStarted = new CountDownLatch(numConsumers); + final ConcurrentLinkedQueue connectionList = new ConcurrentLinkedQueue(); + for (int i=0; i= 0) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(payload); + message.setIntProperty("seq", seq); + activeMQMessageProducer.send(message); + } + activeMQConnectionP.close(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } finally { + producerDone.countDown(); + } + } + }); + } + + consumerStarted.await(10, TimeUnit.MINUTES); + producerDone.await(10, TimeUnit.MINUTES); + + for (ActiveMQConnection c : connectionList) { + c.close(); + } + + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getTotalEnqueueCount() >= totalToProduce; + } + }); + assertEquals("total enqueue as expected, nothing added to dlq", totalToProduce, brokerService.getAdminView().getTotalEnqueueCount()); + } }