mirror of https://github.com/apache/activemq.git
NetworkBrokerDetachTest refactoring
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@934871 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fab6ba187d
commit
db9533b631
|
@ -48,6 +48,8 @@ import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
|||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class NetworkBrokerDetachTest {
|
||||
|
@ -62,6 +64,9 @@ public class NetworkBrokerDetachTest {
|
|||
protected final int networkTTL = 2;
|
||||
protected final boolean dynamicOnly = false;
|
||||
|
||||
protected BrokerService broker;
|
||||
protected BrokerService networkedBroker;
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName(BROKER_NAME);
|
||||
|
@ -96,20 +101,34 @@ public class NetworkBrokerDetachTest {
|
|||
//broker.setPersistenceAdapter(persistenceAdapter);
|
||||
|
||||
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
|
||||
persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/NetworBrokerDetatchTest"));
|
||||
persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + broker.getBrokerName() + "NetworBrokerDetatchTest"));
|
||||
broker.setPersistenceAdapter(persistenceAdapter);
|
||||
|
||||
// default AMQ
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNetworkedBrokerDetach() throws Exception {
|
||||
BrokerService broker = createBroker();
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
broker = createBroker();
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.start();
|
||||
|
||||
BrokerService networkedBroker = createNetworkedBroker();
|
||||
networkedBroker = createNetworkedBroker();
|
||||
networkedBroker.setDeleteAllMessagesOnStartup(true);
|
||||
networkedBroker.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
networkedBroker.stop();
|
||||
networkedBroker.waitUntilStopped();
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNetworkedBrokerDetach() throws Exception {
|
||||
LOG.info("Creating Consumer on the networked broker ...");
|
||||
// Create a consumer on the networked broker
|
||||
ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
|
||||
|
@ -130,23 +149,11 @@ public class NetworkBrokerDetachTest {
|
|||
|
||||
// We should have 0 consumer for the queue on the local broker
|
||||
assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, BROKER_NAME));
|
||||
|
||||
networkedBroker.stop();
|
||||
networkedBroker.waitUntilStopped();
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
|
||||
BrokerService brokerOne = createBroker();
|
||||
brokerOne.setDeleteAllMessagesOnStartup(true);
|
||||
brokerOne.start();
|
||||
|
||||
BrokerService brokerTwo = createNetworkedBroker();
|
||||
brokerTwo.setDeleteAllMessagesOnStartup(true);
|
||||
brokerTwo.start();
|
||||
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
MessageListener counter = new MessageListener() {
|
||||
|
@ -156,8 +163,8 @@ public class NetworkBrokerDetachTest {
|
|||
};
|
||||
|
||||
LOG.info("Creating durable consumer on each broker ...");
|
||||
ActiveMQTopic destination = registerDurableConsumer(brokerTwo, counter);
|
||||
registerDurableConsumer(brokerOne, counter);
|
||||
ActiveMQTopic destination = registerDurableConsumer(networkedBroker, counter);
|
||||
registerDurableConsumer(broker, counter);
|
||||
|
||||
assertTrue("got expected consumer count from local broker mbean within time limit",
|
||||
verifyConsumerCount(2, destination, BROKER_NAME));
|
||||
|
@ -165,25 +172,25 @@ public class NetworkBrokerDetachTest {
|
|||
assertTrue("got expected consumer count from network broker mbean within time limit",
|
||||
verifyConsumerCount(2, destination, REM_BROKER_NAME));
|
||||
|
||||
sendMessageTo(destination, brokerOne);
|
||||
sendMessageTo(destination, broker);
|
||||
|
||||
assertTrue("Got one message on each", verifyMessageCount(2, count));
|
||||
|
||||
LOG.info("Stopping brokerTwo...");
|
||||
brokerTwo.stop();
|
||||
brokerTwo.waitUntilStopped();
|
||||
networkedBroker.stop();
|
||||
networkedBroker.waitUntilStopped();
|
||||
|
||||
LOG.info("restarting broker Two...");
|
||||
brokerTwo = createNetworkedBroker();
|
||||
brokerTwo.start();
|
||||
networkedBroker = createNetworkedBroker();
|
||||
networkedBroker.start();
|
||||
|
||||
LOG.info("Recreating durable Consumer on the broker after restart...");
|
||||
registerDurableConsumer(brokerTwo, counter);
|
||||
registerDurableConsumer(networkedBroker, counter);
|
||||
|
||||
// give advisories a chance to percolate
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
|
||||
sendMessageTo(destination, brokerOne);
|
||||
sendMessageTo(destination, broker);
|
||||
|
||||
// expect similar after restart
|
||||
assertTrue("got expected consumer count from local broker mbean within time limit",
|
||||
|
@ -199,11 +206,6 @@ public class NetworkBrokerDetachTest {
|
|||
assertTrue("Got two more messages after restart", verifyMessageCount(4, count));
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
assertTrue("still Got just two more messages", verifyMessageCount(4, count));
|
||||
|
||||
brokerTwo.stop();
|
||||
brokerTwo.waitUntilStopped();
|
||||
brokerOne.stop();
|
||||
brokerOne.waitUntilStopped();
|
||||
}
|
||||
|
||||
private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue