diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java index 7a53366e00..d98b366148 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java @@ -78,6 +78,7 @@ public class BrokerRegistry { public void bind(String brokerName, BrokerService broker) { synchronized (mutex) { brokers.put(brokerName, broker); + mutex.notifyAll(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index cfa7d86caa..fe8ff40c0f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -445,7 +445,6 @@ public class BrokerService implements Service { processHelperProperties(); - BrokerRegistry.getInstance().bind(getBrokerName(), this); getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); @@ -465,7 +464,8 @@ public class BrokerService implements Service { } getBroker().start(); - + BrokerRegistry.getInstance().bind(getBrokerName(), this); + // see if there is a MasterBroker service and if so, configure // it and start it. for (Service service : services) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index b831a2084d..1cffafce79 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -58,6 +58,7 @@ public class VMTransportFactory extends TransportFactory { String host; Map options; boolean create = true; + int waitForStart = -1; CompositeData data = URISupport.parseComposite(location); if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) { brokerURI = data.getComponents()[0]; @@ -88,6 +89,10 @@ public class VMTransportFactory extends TransportFactory { if ("false".equals(options.remove("create"))) { create = false; } + String waitForStartString = options.remove("waitForStart"); + if (waitForStartString != null) { + waitForStart = Integer.parseInt(waitForStartString); + } } catch (URISyntaxException e1) { throw IOExceptionSupport.create(e1); } @@ -102,10 +107,9 @@ public class VMTransportFactory extends TransportFactory { BrokerService broker = null; // Synchronize on the registry so that multiple concurrent threads // doing this do not think that the broker has not been created and - // cause multiple - // brokers to be started. + // cause multiple brokers to be started. synchronized (BrokerRegistry.getInstance().getRegistryMutext()) { - broker = BrokerRegistry.getInstance().lookup(host); + broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart); if (broker == null) { if (!create) { throw new IOException("Broker named '" + host + "' does not exist."); @@ -121,6 +125,7 @@ public class VMTransportFactory extends TransportFactory { throw IOExceptionSupport.create(e); } BROKERS.put(host, broker); + BrokerRegistry.getInstance().getRegistryMutext().notifyAll(); } server = SERVERS.get(host); @@ -152,6 +157,32 @@ public class VMTransportFactory extends TransportFactory { return transport; } + /** + * @param registry + * @param brokerName + * @param waitForStart - time in milliseconds to wait for a broker to appear + * @return + */ + private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) { + BrokerService broker = null; + synchronized(registry.getRegistryMutext()) { + broker = registry.lookup(brokerName); + if (broker == null && waitForStart > 0) { + final long expiry = System.currentTimeMillis() + waitForStart; + while (broker == null && expiry > System.currentTimeMillis()) { + long timeout = Math.max(0, expiry - System.currentTimeMillis()); + try { + LOG.debug("waiting for broker named: " + brokerName + " to start"); + registry.getRegistryMutext().wait(timeout); + } catch (InterruptedException ignored) { + } + broker = registry.lookup(brokerName); + } + } + } + return broker; + } + public TransportServer doBind(URI location) throws IOException { return bind(location, false); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java index 36c29282ad..f85bdba8d9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java @@ -38,4 +38,8 @@ public class MessageSender { session.commit(); } } + + public MessageProducer getProducer() { + return producer; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java b/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java index 8aced23525..bf14a05bf2 100755 --- a/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java @@ -79,7 +79,7 @@ public class ProxyConnectorTest extends ProxyTestSupport { // Give broker enough time to receive and register the consumer info // Either that or make consumer retroactive try { - Thread.sleep(1000); + Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java new file mode 100644 index 0000000000..e6bb9510e9 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java @@ -0,0 +1,60 @@ +package org.apache.activemq.transport.vm; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; + +public class VMTransportWaitForTest extends TestCase { + + private static final String VM_BROKER_URI_NO_WAIT = + "vm://localhost?broker.persistent=false&create=false"; + + private static final String VM_BROKER_URI_WAIT_FOR_START = + VM_BROKER_URI_NO_WAIT + "&waitForStart=20000"; + + CountDownLatch started = new CountDownLatch(1); + CountDownLatch gotConnection = new CountDownLatch(1); + + public void testWaitFor() throws Exception { + try { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT)); + cf.createConnection(); + fail("expect broker not exist exception"); + } catch (JMSException expectedOnNoBrokerAndNoCreate) { + } + + // spawn a thread that will wait for an embedded broker to start via vm://.. + Thread t = new Thread() { + public void run() { + try { + started.countDown(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START)); + cf.createConnection(); + gotConnection.countDown(); + + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception:" + e); + } + } + }; + t.start(); + started.await(20, TimeUnit.SECONDS); + Thread.yield(); + assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS)); + + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.addConnector("tcp://localhost:61616"); + broker.start(); + assertTrue("has got connection", gotConnection.await(200, TimeUnit.MILLISECONDS)); + broker.stop(); + } +} diff --git a/assembly/src/release/conf/activemq.xml b/assembly/src/release/conf/activemq.xml index c404165cd5..09a4dfe03d 100755 --- a/assembly/src/release/conf/activemq.xml +++ b/assembly/src/release/conf/activemq.xml @@ -113,6 +113,10 @@ ** ** http://activemq.apache.org/enterprise-integration-patterns.html --> + + + +