add waitForStart option to vm url, fix AMQ-1895

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@687280 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-08-20 08:07:48 +00:00
parent 947659cd52
commit 43880af18a
7 changed files with 106 additions and 6 deletions

View File

@ -78,6 +78,7 @@ public class BrokerRegistry {
public void bind(String brokerName, BrokerService broker) { public void bind(String brokerName, BrokerService broker) {
synchronized (mutex) { synchronized (mutex) {
brokers.put(brokerName, broker); brokers.put(brokerName, broker);
mutex.notifyAll();
} }
} }

View File

@ -445,7 +445,6 @@ public class BrokerService implements Service {
processHelperProperties(); processHelperProperties();
BrokerRegistry.getInstance().bind(getBrokerName(), this);
getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
@ -465,7 +464,8 @@ public class BrokerService implements Service {
} }
getBroker().start(); getBroker().start();
BrokerRegistry.getInstance().bind(getBrokerName(), this);
// see if there is a MasterBroker service and if so, configure // see if there is a MasterBroker service and if so, configure
// it and start it. // it and start it.
for (Service service : services) { for (Service service : services) {

View File

@ -58,6 +58,7 @@ public class VMTransportFactory extends TransportFactory {
String host; String host;
Map<String, String> options; Map<String, String> options;
boolean create = true; boolean create = true;
int waitForStart = -1;
CompositeData data = URISupport.parseComposite(location); CompositeData data = URISupport.parseComposite(location);
if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) { if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
brokerURI = data.getComponents()[0]; brokerURI = data.getComponents()[0];
@ -88,6 +89,10 @@ public class VMTransportFactory extends TransportFactory {
if ("false".equals(options.remove("create"))) { if ("false".equals(options.remove("create"))) {
create = false; create = false;
} }
String waitForStartString = options.remove("waitForStart");
if (waitForStartString != null) {
waitForStart = Integer.parseInt(waitForStartString);
}
} catch (URISyntaxException e1) { } catch (URISyntaxException e1) {
throw IOExceptionSupport.create(e1); throw IOExceptionSupport.create(e1);
} }
@ -102,10 +107,9 @@ public class VMTransportFactory extends TransportFactory {
BrokerService broker = null; BrokerService broker = null;
// Synchronize on the registry so that multiple concurrent threads // Synchronize on the registry so that multiple concurrent threads
// doing this do not think that the broker has not been created and // doing this do not think that the broker has not been created and
// cause multiple // cause multiple brokers to be started.
// brokers to be started.
synchronized (BrokerRegistry.getInstance().getRegistryMutext()) { synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
broker = BrokerRegistry.getInstance().lookup(host); broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
if (broker == null) { if (broker == null) {
if (!create) { if (!create) {
throw new IOException("Broker named '" + host + "' does not exist."); throw new IOException("Broker named '" + host + "' does not exist.");
@ -121,6 +125,7 @@ public class VMTransportFactory extends TransportFactory {
throw IOExceptionSupport.create(e); throw IOExceptionSupport.create(e);
} }
BROKERS.put(host, broker); BROKERS.put(host, broker);
BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
} }
server = SERVERS.get(host); server = SERVERS.get(host);
@ -152,6 +157,32 @@ public class VMTransportFactory extends TransportFactory {
return transport; return transport;
} }
/**
* @param registry
* @param brokerName
* @param waitForStart - time in milliseconds to wait for a broker to appear
* @return
*/
private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
BrokerService broker = null;
synchronized(registry.getRegistryMutext()) {
broker = registry.lookup(brokerName);
if (broker == null && waitForStart > 0) {
final long expiry = System.currentTimeMillis() + waitForStart;
while (broker == null && expiry > System.currentTimeMillis()) {
long timeout = Math.max(0, expiry - System.currentTimeMillis());
try {
LOG.debug("waiting for broker named: " + brokerName + " to start");
registry.getRegistryMutext().wait(timeout);
} catch (InterruptedException ignored) {
}
broker = registry.lookup(brokerName);
}
}
}
return broker;
}
public TransportServer doBind(URI location) throws IOException { public TransportServer doBind(URI location) throws IOException {
return bind(location, false); return bind(location, false);
} }

View File

@ -38,4 +38,8 @@ public class MessageSender {
session.commit(); session.commit();
} }
} }
public MessageProducer getProducer() {
return producer;
}
} }

View File

@ -79,7 +79,7 @@ public class ProxyConnectorTest extends ProxyTestSupport {
// Give broker enough time to receive and register the consumer info // Give broker enough time to receive and register the consumer info
// Either that or make consumer retroactive // Either that or make consumer retroactive
try { try {
Thread.sleep(1000); Thread.sleep(2000);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -0,0 +1,60 @@
package org.apache.activemq.transport.vm;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
public class VMTransportWaitForTest extends TestCase {
private static final String VM_BROKER_URI_NO_WAIT =
"vm://localhost?broker.persistent=false&create=false";
private static final String VM_BROKER_URI_WAIT_FOR_START =
VM_BROKER_URI_NO_WAIT + "&waitForStart=20000";
CountDownLatch started = new CountDownLatch(1);
CountDownLatch gotConnection = new CountDownLatch(1);
public void testWaitFor() throws Exception {
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT));
cf.createConnection();
fail("expect broker not exist exception");
} catch (JMSException expectedOnNoBrokerAndNoCreate) {
}
// spawn a thread that will wait for an embedded broker to start via vm://..
Thread t = new Thread() {
public void run() {
try {
started.countDown();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START));
cf.createConnection();
gotConnection.countDown();
} catch (Exception e) {
e.printStackTrace();
fail("unexpected exception:" + e);
}
}
};
t.start();
started.await(20, TimeUnit.SECONDS);
Thread.yield();
assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS));
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616");
broker.start();
assertTrue("has got connection", gotConnection.await(200, TimeUnit.MILLISECONDS));
broker.stop();
}
}

View File

@ -113,6 +113,10 @@
** **
** http://activemq.apache.org/enterprise-integration-patterns.html ** http://activemq.apache.org/enterprise-integration-patterns.html
--> -->
<!-- configure the camel activemq component to use the current broker -->
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
<property name="brokerURL" value="vm://localhost?create=false&amp;waitForStart=10000" />
</bean>
<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring"> <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
<!-- You can use a <package> element for each root package to search for Java routes --> <!-- You can use a <package> element for each root package to search for Java routes -->