The producerAudit is only written out on change so in this test it holds onto log 5 meaning there are two left instead of one.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1434055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-16 17:24:38 +00:00
parent 1b27f95393
commit 59d13ad913
1 changed files with 21 additions and 12 deletions

View File

@ -17,12 +17,13 @@
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.HashSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -31,7 +32,9 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.management.ObjectName; import javax.management.ObjectName;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -43,9 +46,9 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.page.PageFile; import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -57,8 +60,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public boolean keepDurableSubsActive = true; public boolean keepDurableSubsActive = true;
private BrokerService broker; private BrokerService broker;
private ActiveMQTopic topic; private ActiveMQTopic topic;
private List<Throwable> exceptions = new ArrayList<Throwable>(); private final List<Throwable> exceptions = new ArrayList<Throwable>();
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
connectionFactory.setWatchTopicAdvisories(false); connectionFactory.setWatchTopicAdvisories(false);
@ -80,7 +84,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public static Test suite() { public static Test suite() {
return suite(DurableSubscriptionOfflineTest.class); return suite(DurableSubscriptionOfflineTest.class);
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
exceptions.clear(); exceptions.clear();
topic = (ActiveMQTopic) createDestination(); topic = (ActiveMQTopic) createDestination();
@ -88,6 +93,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
super.setUp(); super.setUp();
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
destroyBroker(); destroyBroker();
@ -96,7 +102,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
private void createBroker() throws Exception { private void createBroker() throws Exception {
createBroker(true); createBroker(true);
} }
private void createBroker(boolean deleteAllMessages) throws Exception { private void createBroker(boolean deleteAllMessages) throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")"); broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
broker.setBrokerName(getName(true)); broker.setBrokerName(getName(true));
@ -113,7 +119,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
policyMap.setDefaultEntry(policy); policyMap.setDefaultEntry(policy);
broker.setDestinationPolicy(policyMap); broker.setDestinationPolicy(policyMap);
} }
setDefaultPersistenceAdapter(broker); setDefaultPersistenceAdapter(broker);
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
// ensure it kicks in during tests // ensure it kicks in during tests
@ -220,7 +226,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count); assertEquals(sent, listener.count);
} }
public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception { public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
@ -604,7 +610,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con.close(); con.close();
assertEquals("offline consumer got all", sent, listener.count); assertEquals("offline consumer got all", sent, listener.count);
} }
public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
@ -811,7 +817,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
this.addCombinationValues("defaultPersistenceAdapter", this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
} }
public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception { public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
// create offline subs 1 // create offline subs 1
Connection con = createConnection("offCli1"); Connection con = createConnection("offCli1");
@ -852,7 +858,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
Thread.sleep(3 * 1000); Thread.sleep(3 * 1000);
broker.stop(); broker.stop();
createBroker(false /*deleteAllMessages*/); createBroker(false /*deleteAllMessages*/);
// send more messages // send more messages
con = createConnection(); con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -1343,7 +1349,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
Thread.sleep(1 * 1000); Thread.sleep(1 * 1000);
session.close(); session.close();
con.close(); con.close();
LOG.info("cli1 again, should get 1 new ones"); LOG.info("cli1 again, should get 1 new ones");
con = createConnection("cli1"); con = createConnection("cli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -1491,6 +1497,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
final Listener listener = new Listener(); final Listener listener = new Listener();
consumer.setMessageListener(listener); consumer.setMessageListener(listener);
assertTrue("got all sent", Wait.waitFor(new Wait.Condition() { assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("Want: " + toSend + ", current: " + listener.count); LOG.info("Want: " + toSend + ", current: " + listener.count);
return listener.count == toSend; return listener.count == toSend;
@ -1502,7 +1509,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
destroyBroker(); destroyBroker();
createBroker(false); createBroker(false);
KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size()); assertEquals("only two journal file(s) left after restart", 2, pa.getStore().getJournal().getFileMap().size());
} }
// https://issues.apache.org/jira/browse/AMQ-3768 // https://issues.apache.org/jira/browse/AMQ-3768
@ -1674,6 +1681,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
Listener(String id) { Listener(String id) {
this.id = id; this.id = id;
} }
@Override
public void onMessage(Message message) { public void onMessage(Message message) {
count++; count++;
if (id != null) { if (id != null) {
@ -1686,6 +1694,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public class FilterCheckListener extends Listener { public class FilterCheckListener extends Listener {
@Override
public void onMessage(Message message) { public void onMessage(Message message) {
count++; count++;