diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 4bc7ad3b18..18b76b506b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -17,12 +17,13 @@ package org.apache.activemq.usecases; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Random; -import java.util.HashSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -31,7 +32,9 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.ObjectName; + import junit.framework.Test; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; @@ -43,9 +46,9 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.Wait; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +60,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public boolean keepDurableSubsActive = true; private BrokerService broker; private ActiveMQTopic topic; - private List exceptions = new ArrayList(); + private final List exceptions = new ArrayList(); + @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); connectionFactory.setWatchTopicAdvisories(false); @@ -80,7 +84,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public static Test suite() { return suite(DurableSubscriptionOfflineTest.class); } - + + @Override protected void setUp() throws Exception { exceptions.clear(); topic = (ActiveMQTopic) createDestination(); @@ -88,6 +93,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp super.setUp(); } + @Override protected void tearDown() throws Exception { super.tearDown(); destroyBroker(); @@ -96,7 +102,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp private void createBroker() throws Exception { createBroker(true); } - + private void createBroker(boolean deleteAllMessages) throws Exception { broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")"); broker.setBrokerName(getName(true)); @@ -113,7 +119,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp policyMap.setDefaultEntry(policy); broker.setDestinationPolicy(policyMap); } - + setDefaultPersistenceAdapter(broker); if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { // ensure it kicks in during tests @@ -220,7 +226,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(sent, listener.count); } - + public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); @@ -604,7 +610,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con.close(); assertEquals("offline consumer got all", sent, listener.count); - } + } public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", @@ -811,7 +817,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp this.addCombinationValues("defaultPersistenceAdapter", new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); } - + public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception { // create offline subs 1 Connection con = createConnection("offCli1"); @@ -852,7 +858,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp Thread.sleep(3 * 1000); broker.stop(); createBroker(false /*deleteAllMessages*/); - + // send more messages con = createConnection(); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1343,7 +1349,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp Thread.sleep(1 * 1000); session.close(); con.close(); - + LOG.info("cli1 again, should get 1 new ones"); con = createConnection("cli1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -1491,6 +1497,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp final Listener listener = new Listener(); consumer.setMessageListener(listener); assertTrue("got all sent", Wait.waitFor(new Wait.Condition() { + @Override public boolean isSatisified() throws Exception { LOG.info("Want: " + toSend + ", current: " + listener.count); return listener.count == toSend; @@ -1502,7 +1509,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp destroyBroker(); createBroker(false); KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size()); + assertEquals("only two journal file(s) left after restart", 2, pa.getStore().getJournal().getFileMap().size()); } // https://issues.apache.org/jira/browse/AMQ-3768 @@ -1674,6 +1681,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp Listener(String id) { this.id = id; } + @Override public void onMessage(Message message) { count++; if (id != null) { @@ -1686,6 +1694,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public class FilterCheckListener extends Listener { + @Override public void onMessage(Message message) { count++;