diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 7bdd90eeff..56ed5be84f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; @@ -145,6 +146,7 @@ public class BrokerService implements Service, Serializable { private URI vmConnectorURI; private PolicyMap destinationPolicy; private AtomicBoolean started = new AtomicBoolean(false); + private AtomicBoolean stopped = new AtomicBoolean(false); private BrokerPlugin[] plugins; private boolean keepDurableSubsActive=true; private boolean useVirtualTopics=true; @@ -154,9 +156,8 @@ public class BrokerService implements Service, Serializable { private Store tempDataStore; private int persistenceThreadPriority = Thread.MAX_PRIORITY; private boolean useLocalHostBrokerName = false; - + private CountDownLatch stoppedLatch = new CountDownLatch(1); - /** * Adds a new transport connector for the given bind address * @@ -471,33 +472,27 @@ public class BrokerService implements Service, Serializable { // to avoid timimg issue with discovery (spinning up a new instance) BrokerRegistry.getInstance().unbind(getBrokerName()); VMTransportFactory.stopped(getBrokerName()); + stopped.set(true); + stoppedLatch.countDown(); + log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped"); stopper.throwFirstException(); } - protected void stopAllConnectors(ServiceStopper stopper) { - - for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { - NetworkConnector connector = (NetworkConnector) iter.next(); - unregisterNetworkConnectorMBean(connector); - stopper.stop(connector); + /** + * A helper method to block the caller thread until the broker has been stopped + */ + public void waitUntilStopped() { + while (!stopped.get()) { + try { + stoppedLatch.await(); + } + catch (InterruptedException e) { + // ignore + } } + } - for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { - ProxyConnector connector = (ProxyConnector) iter.next(); - stopper.stop(connector); - } - - for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { - JmsConnector connector = (JmsConnector) iter.next(); - stopper.stop(connector); - } - - for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { - TransportConnector connector = (TransportConnector) iter.next(); - stopper.stop(connector); - } - } // Properties // ------------------------------------------------------------------------- @@ -1123,6 +1118,30 @@ public class BrokerService implements Service, Serializable { } } + protected void stopAllConnectors(ServiceStopper stopper) { + + for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { + NetworkConnector connector = (NetworkConnector) iter.next(); + unregisterNetworkConnectorMBean(connector); + stopper.stop(connector); + } + + for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { + ProxyConnector connector = (ProxyConnector) iter.next(); + stopper.stop(connector); + } + + for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { + JmsConnector connector = (JmsConnector) iter.next(); + stopper.stop(connector); + } + + for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { + TransportConnector connector = (TransportConnector) iter.next(); + stopper.stop(connector); + } + } + protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); if (mbeanServer != null) { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/Main.java b/activemq-core/src/test/java/org/apache/activemq/broker/Main.java index f26d650b7b..40114b538f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/Main.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/Main.java @@ -18,12 +18,7 @@ package org.apache.activemq.broker; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.ManagementContext; -import org.apache.activemq.broker.util.UDPTraceBrokerPlugin; -import org.apache.activemq.broker.view.ConnectionDotFilePlugin; -import org.apache.activemq.broker.view.DestinationDotFilePlugin; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.demo.DefaultQueueSender; @@ -34,10 +29,11 @@ import javax.jms.Session; /** * A helper class which can be handy for running a broker in your IDE from the * activemq-core module. - * + * * @version $Revision$ */ public class Main { + protected static boolean createConsumers = false; /** * @param args @@ -66,23 +62,29 @@ public class Main { broker.addConnector("stomp://localhost:61613"); broker.start(); - // lets create a dummy couple of consumers - Connection connection = new ActiveMQConnectionFactory().createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM")); - MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 100"); - Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 200"); // lets publish some messages so that there is some stuff to browse - DefaultQueueSender.main(new String[] { "Prices.Equity.IBM" }); - DefaultQueueSender.main(new String[] { "Prices.Equity.MSFT" }); + DefaultQueueSender.main(new String[]{"Prices.Equity.IBM"}); + DefaultQueueSender.main(new String[]{"Prices.Equity.MSFT"}); + + // lets create a dummy couple of consumers + if (createConsumers) { + Connection connection = new ActiveMQConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM")); + MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 100"); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 200"); + } + else { + // Lets wait for the broker + broker.waitUntilStopped(); + } } catch (Exception e) { System.out.println("Failed: " + e); e.printStackTrace(); } } - }