AMQ-6413 - ensure audit update on skipped store add for kahadb concurrentStoreAndDispatch. Fix and test

This commit is contained in:
gtully 2016-09-01 16:46:21 +01:00
parent ed0e786b60
commit f8bc19b96d
2 changed files with 116 additions and 0 deletions

View File

@ -421,6 +421,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
removeMessage(context, ack);
} else {
indexLock.writeLock().lock();
try {
metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
} finally {
indexLock.writeLock().unlock();
}
synchronized (asyncTaskMap) {
asyncTaskMap.remove(key);
}

View File

@ -20,7 +20,10 @@ package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -35,6 +38,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -224,4 +229,109 @@ public class AMQ5212Test {
activeMQConnection.close();
}
@Test
public void verifyProducerAudit() throws Exception {
MutableBrokerFilter filter = (MutableBrokerFilter)brokerService.getBroker().getAdaptor(MutableBrokerFilter.class);
filter.setNext(new MutableBrokerFilter(filter.getNext()) {
@Override
public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception {
super.send(producerExchange, messageSend);
Object seq = messageSend.getProperty("seq");
if (seq instanceof Integer) {
if ( ((Integer) seq).intValue() %200 == 0 && producerExchange.getConnectionContext().getConnection() != null) {
producerExchange.getConnectionContext().setDontSendReponse(true);
producerExchange.getConnectionContext().getConnection().serviceException(new IOException("force reconnect"));
}
}
}
});
final AtomicInteger received = new AtomicInteger(0);
final ActiveMQQueue dest = new ActiveMQQueue("Q");
final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://" + brokerService.getTransportConnectors().get(0).getPublishableConnectString());
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setWatchTopicAdvisories(false);
final int numConsumers = 40;
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch consumerStarted = new CountDownLatch(numConsumers);
final ConcurrentLinkedQueue<ActiveMQConnection> connectionList = new ConcurrentLinkedQueue<ActiveMQConnection>();
for (int i=0; i<numConsumers; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
activeMQConnection.getPrefetchPolicy().setAll(0);
activeMQConnection.start();
connectionList.add(activeMQConnection);
ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = activeMQSession.createConsumer(dest);
consumerStarted.countDown();
while (true) {
if(messageConsumer.receive(500) != null) {
received.incrementAndGet();
}
}
} catch (javax.jms.IllegalStateException expected) {
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
}
final String payload = new String(new byte[8 * 1024]);
final int totalToProduce = 5000;
final AtomicInteger toSend = new AtomicInteger(totalToProduce);
final int numProducers = 10;
final CountDownLatch producerDone = new CountDownLatch(numProducers);
for (int i=0;i<numProducers;i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
ActiveMQConnection activeMQConnectionP = (ActiveMQConnection) connectionFactory.createConnection();
activeMQConnectionP.start();
ActiveMQSession activeMQSessionP = (ActiveMQSession) activeMQConnectionP.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSessionP.createProducer(dest);
int seq = 0;
while ((seq = toSend.decrementAndGet()) >= 0) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(payload);
message.setIntProperty("seq", seq);
activeMQMessageProducer.send(message);
}
activeMQConnectionP.close();
} catch (Exception ignored) {
ignored.printStackTrace();
} finally {
producerDone.countDown();
}
}
});
}
consumerStarted.await(10, TimeUnit.MINUTES);
producerDone.await(10, TimeUnit.MINUTES);
for (ActiveMQConnection c : connectionList) {
c.close();
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerService.getAdminView().getTotalEnqueueCount() >= totalToProduce;
}
});
assertEquals("total enqueue as expected, nothing added to dlq", totalToProduce, brokerService.getAdminView().getTotalEnqueueCount());
}
}