diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java index b2e969a203..5002521782 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java @@ -16,19 +16,21 @@ */ package org.apache.activemq.bugs; +import java.io.File; +import java.io.FilenameFilter; +import java.util.Arrays; import java.util.Properties; +import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicLong; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; - -import junit.framework.Test; - +import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -53,95 +55,134 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport { ActiveMQTopic topic; ActiveMQConnection consumerConnection = null, producerConnection = null, dlqConnection = null; + Session consumerSession; Session producerSession; MessageProducer producer; - final int minPercentUsageForStore = 10; + Vector duralbeSubs = new Vector(); final int numMessages = 1000; + final int numDurableSubs = 2; String data; + private long dlqConsumerLastReceivedTimeStamp; + private AtomicLong dlqReceivedCount = new AtomicLong(0); + + // 2 deliveries of each message, 3 producers + CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2 * numMessages) * numDurableSubs) - 1); + // should get at least numMessages, possibly more + CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages - 1)); public void testSize() throws Exception { - CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2*numMessages) *3) -1); - CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages) -1); openConsumer(redeliveryConsumerLatch); openDlqConsumer(dlqConsumerLatch); - + assertEquals(0, broker.getAdminView().getStorePercentUsage()); for (int i = 0; i < numMessages; i++) { sendMessage(false); } - + final BrokerView brokerView = broker.getAdminView(); broker.getSystemUsage().getStoreUsage().isFull(); - LOG.info("store percent usage: "+brokerView.getStorePercentUsage()); - //assertTrue("some store in use", broker.getAdminView().getStorePercentUsage() > minPercentUsageForStore); - assertTrue("redelivery consumer got all it needs", redeliveryConsumerLatch.await(60, TimeUnit.SECONDS)); + LOG.info("store percent usage: " + brokerView.getStorePercentUsage()); + assertTrue("redelivery consumer got all it needs, remaining: " + + redeliveryConsumerLatch.getCount(), redeliveryConsumerLatch.await(60, TimeUnit.SECONDS)); assertTrue("dql consumer got all it needs", dlqConsumerLatch.await(60, TimeUnit.SECONDS)); closeConsumer(); LOG.info("Giving dlq a chance to clear down once topic consumer is closed"); + + // consumer all of the duplicates that arrived after the first ack + closeDlqConsumer(); + //get broker a chance to clean obsolete messages, wait 2*cleanupInterval Thread.sleep(5000); - // consumer some of the duplicates that arrived after the first ack - closeDlqConsumer(); - int numFiles = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getDirectory().list().length; + FilenameFilter justLogFiles = new FilenameFilter() { + public boolean accept(File file, String s) { + return s.endsWith(".log"); + } + }; + int numFiles = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles).length; + if (numFiles > 2) { + LOG.info(Arrays.toString(((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles))); + } LOG.info("num files: " + numFiles); - assertTrue("kahaDB dir should contain few db files,but definitely less than 10, is: " + numFiles,10>numFiles); - } - + assertEquals("kahaDB dir should contain 1 db file,is: " + numFiles, 1, numFiles); + } private void openConsumer(final CountDownLatch latch) throws Exception { consumerConnection = (ActiveMQConnection) createConnection(); consumerConnection.setClientID("cliID"); consumerConnection.start(); - final Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageListener listener = new MessageListener() { public void onMessage(Message message) { latch.countDown(); try { - session.recover(); + consumerSession.recover(); } catch (Exception ignored) { ignored.printStackTrace(); } - } }; - session.createDurableSubscriber(topic, "subName1").setMessageListener(listener); - session.createDurableSubscriber(topic, "subName2").setMessageListener(listener); - session.createDurableSubscriber(topic, "subName3").setMessageListener(listener); + for (int i = 1; i <= numDurableSubs; i++) { + TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, "subName" + i); + sub.setMessageListener(listener); + duralbeSubs.add(sub); + } } - private void openDlqConsumer(final CountDownLatch received)throws Exception{ - - dlqConnection = (ActiveMQConnection) createConnection(); - Session dlqSession = dlqConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); - dlqConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - if (received.getCount() % 200 == 0) { - LOG.info("remaining on DLQ: " + received.getCount()); - } - received.countDown(); - } - }); - dlqConnection.start(); - } - - + + private void openDlqConsumer(final CountDownLatch received) throws Exception { + + dlqConnection = (ActiveMQConnection) createConnection(); + Session dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + dlqConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + if (received.getCount() > 0 && received.getCount() % 200 == 0) { + LOG.info("remaining on DLQ: " + received.getCount()); + } + received.countDown(); + dlqConsumerLastReceivedTimeStamp = System.currentTimeMillis(); + dlqReceivedCount.incrementAndGet(); + } + }); + dlqConnection.start(); + } + + private void closeConsumer() throws JMSException { - if (consumerConnection != null) + for (TopicSubscriber sub : duralbeSubs) { + sub.close(); + } + if (consumerSession != null) { + for (int i = 1; i <= numDurableSubs; i++) { + consumerSession.unsubscribe("subName" + i); + } + } + if (consumerConnection != null) { consumerConnection.close(); - consumerConnection = null; + consumerConnection = null; + } } - private void closeDlqConsumer() throws JMSException { - if (dlqConnection != null) - dlqConnection.close(); - dlqConnection = null; + + private void closeDlqConsumer() throws JMSException, InterruptedException { + final long limit = System.currentTimeMillis() + 30 * 1000; + if (dlqConsumerLastReceivedTimeStamp > 0) { + while (System.currentTimeMillis() < dlqConsumerLastReceivedTimeStamp + 5000 + && System.currentTimeMillis() < limit) { + LOG.info("waiting for DLQ do drain, receivedCount: " + dlqReceivedCount); + TimeUnit.SECONDS.sleep(1); + } + } + if (dlqConnection != null) { + dlqConnection.close(); + dlqConnection = null; + } } private void sendMessage(boolean filter) throws Exception { @@ -171,10 +212,6 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport { if (deleteMessages) { broker.setDeleteAllMessagesOnStartup(true); } - KahaDBPersistenceAdapter persistenceAdapter=new KahaDBPersistenceAdapter(); - persistenceAdapter.setEnableJournalDiskSyncs(false); - - broker.setPersistenceAdapter(persistenceAdapter); configurePersistenceAdapter(broker.getPersistenceAdapter()); broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000); broker.start(); @@ -187,7 +224,8 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport { properties.put("maxFileLength", maxFileLengthVal); properties.put("cleanupInterval", "2000"); properties.put("checkpointInterval", "2000"); - + properties.put("concurrentStoreAndDispatchQueues", "false"); + IntrospectionSupport.setProperties(persistenceAdapter, properties); }