added a little helper method to make it easy to wait on a broker being shut down in Java code

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@512764 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-02-28 14:21:29 +00:00
parent 60a526c36b
commit 44bafc3e03
2 changed files with 61 additions and 40 deletions

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@ -145,6 +146,7 @@ public class BrokerService implements Service, Serializable {
private URI vmConnectorURI;
private PolicyMap destinationPolicy;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean stopped = new AtomicBoolean(false);
private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive=true;
private boolean useVirtualTopics=true;
@ -154,9 +156,8 @@ public class BrokerService implements Service, Serializable {
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName = false;
private CountDownLatch stoppedLatch = new CountDownLatch(1);
/**
* Adds a new transport connector for the given bind address
*
@ -471,33 +472,27 @@ public class BrokerService implements Service, Serializable {
// to avoid timimg issue with discovery (spinning up a new instance)
BrokerRegistry.getInstance().unbind(getBrokerName());
VMTransportFactory.stopped(getBrokerName());
stopped.set(true);
stoppedLatch.countDown();
log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped");
stopper.throwFirstException();
}
protected void stopAllConnectors(ServiceStopper stopper) {
for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = (NetworkConnector) iter.next();
unregisterNetworkConnectorMBean(connector);
stopper.stop(connector);
/**
* A helper method to block the caller thread until the broker has been stopped
*/
public void waitUntilStopped() {
while (!stopped.get()) {
try {
stoppedLatch.await();
}
catch (InterruptedException e) {
// ignore
}
}
}
for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = (ProxyConnector) iter.next();
stopper.stop(connector);
}
for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = (JmsConnector) iter.next();
stopper.stop(connector);
}
for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = (TransportConnector) iter.next();
stopper.stop(connector);
}
}
// Properties
// -------------------------------------------------------------------------
@ -1123,6 +1118,30 @@ public class BrokerService implements Service, Serializable {
}
}
protected void stopAllConnectors(ServiceStopper stopper) {
for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = (NetworkConnector) iter.next();
unregisterNetworkConnectorMBean(connector);
stopper.stop(connector);
}
for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = (ProxyConnector) iter.next();
stopper.stop(connector);
}
for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = (JmsConnector) iter.next();
stopper.stop(connector);
}
for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = (TransportConnector) iter.next();
stopper.stop(connector);
}
}
protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {

View File

@ -18,12 +18,7 @@
package org.apache.activemq.broker;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
import org.apache.activemq.broker.view.DestinationDotFilePlugin;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.demo.DefaultQueueSender;
@ -34,10 +29,11 @@ import javax.jms.Session;
/**
* A helper class which can be handy for running a broker in your IDE from the
* activemq-core module.
*
*
* @version $Revision$
*/
public class Main {
protected static boolean createConsumers = false;
/**
* @param args
@ -66,23 +62,29 @@ public class Main {
broker.addConnector("stomp://localhost:61613");
broker.start();
// lets create a dummy couple of consumers
Connection connection = new ActiveMQConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM"));
MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 100");
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 200");
// lets publish some messages so that there is some stuff to browse
DefaultQueueSender.main(new String[] { "Prices.Equity.IBM" });
DefaultQueueSender.main(new String[] { "Prices.Equity.MSFT" });
DefaultQueueSender.main(new String[]{"Prices.Equity.IBM"});
DefaultQueueSender.main(new String[]{"Prices.Equity.MSFT"});
// lets create a dummy couple of consumers
if (createConsumers) {
Connection connection = new ActiveMQConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM"));
MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 100");
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 200");
}
else {
// Lets wait for the broker
broker.waitUntilStopped();
}
}
catch (Exception e) {
System.out.println("Failed: " + e);
e.printStackTrace();
}
}
}