Ensure that wait for started on vm transport factory actually waits for
start, currently it doesn't really check started or wait porperly.
This commit is contained in:
Timothy Bish 2014-06-09 15:40:03 -04:00
parent 77713d9d1a
commit 5016c4d4f2
3 changed files with 99 additions and 11 deletions

View File

@ -921,8 +921,21 @@ public class BrokerService implements Service {
* @return boolean true if wait succeeded false if broker was not started or was stopped * @return boolean true if wait succeeded false if broker was not started or was stopped
*/ */
public boolean waitUntilStarted() { public boolean waitUntilStarted() {
return waitUntilStarted(Long.MAX_VALUE);
}
/**
* A helper method to block the caller thread until the broker has fully started
*
* @param timeout
* the amount of time to wait before giving up and returning false.
*
* @return boolean true if wait succeeded false if broker was not started or was stopped
*/
public boolean waitUntilStarted(long timeout) {
boolean waitSucceeded = isStarted(); boolean waitSucceeded = isStarted();
while (!isStarted() && !stopped.get() && !waitSucceeded) { long expiration = Math.max(0, timeout + System.currentTimeMillis());
while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
try { try {
if (startException != null) { if (startException != null) {
return waitSucceeded; return waitSucceeded;

View File

@ -42,18 +42,20 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
public class VMTransportFactory extends TransportFactory { public class VMTransportFactory extends TransportFactory {
public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>(); public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>(); public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>(); public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class); private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);
BrokerFactoryHandler brokerFactoryHandler; BrokerFactoryHandler brokerFactoryHandler;
@Override
public Transport doConnect(URI location) throws Exception { public Transport doConnect(URI location) throws Exception {
return VMTransportServer.configure(doCompositeConnect(location)); return VMTransportServer.configure(doCompositeConnect(location));
} }
@Override
public Transport doCompositeConnect(URI location) throws Exception { public Transport doCompositeConnect(URI location) throws Exception {
URI brokerURI; URI brokerURI;
String host; String host;
@ -64,7 +66,7 @@ public class VMTransportFactory extends TransportFactory {
if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) { if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
brokerURI = data.getComponents()[0]; brokerURI = data.getComponents()[0];
CompositeData brokerData = URISupport.parseComposite(brokerURI); CompositeData brokerData = URISupport.parseComposite(brokerURI);
host = (String)brokerData.getParameters().get("brokerName"); host = brokerData.getParameters().get("brokerName");
if (host == null) { if (host == null) {
host = "localhost"; host = "localhost";
} }
@ -79,7 +81,7 @@ public class VMTransportFactory extends TransportFactory {
try { try {
host = extractHost(location); host = extractHost(location);
options = URISupport.parseParameters(location); options = URISupport.parseParameters(location);
String config = (String)options.remove("brokerConfig"); String config = options.remove("brokerConfig");
if (config != null) { if (config != null) {
brokerURI = new URI(config); brokerURI = new URI(config);
} else { } else {
@ -170,32 +172,50 @@ public class VMTransportFactory extends TransportFactory {
return host; return host;
} }
/** /**
* Attempt to find a Broker instance.
*
* @param registry * @param registry
* the registry in which to search for the BrokerService instance.
* @param brokerName * @param brokerName
* @param waitForStart - time in milliseconds to wait for a broker to appear * the name of the Broker that should be located.
* @return * @param waitForStart
* time in milliseconds to wait for a broker to appear and be started.
*
* @return a BrokerService instance if one is found, or null.
*/ */
private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) { private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
BrokerService broker = null; BrokerService broker = null;
synchronized(registry.getRegistryMutext()) { synchronized(registry.getRegistryMutext()) {
broker = registry.lookup(brokerName); broker = registry.lookup(brokerName);
if (broker == null && waitForStart > 0) { if (broker == null || waitForStart > 0) {
final long expiry = System.currentTimeMillis() + waitForStart; final long expiry = System.currentTimeMillis() + waitForStart;
while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis()) { while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis()) {
long timeout = Math.max(0, expiry - System.currentTimeMillis()); long timeout = Math.max(0, expiry - System.currentTimeMillis());
try { try {
LOG.debug("waiting for broker named: " + brokerName + " to start"); LOG.debug("waiting for broker named: " + brokerName + " to enter registry");
registry.getRegistryMutext().wait(timeout); registry.getRegistryMutext().wait(timeout);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
broker = registry.lookup(brokerName); broker = registry.lookup(brokerName);
if (broker != null && !broker.isStarted()) {
LOG.debug("waiting for broker named: " + brokerName + " to start");
timeout = Math.max(0, expiry - System.currentTimeMillis());
// Wait for however long we have left for broker to be started, if
// it doesn't get started we need to clear broker so it doesn't get
// returned. A null return should throw an exception.
if (!broker.waitUntilStarted(timeout)) {
broker = null;
break;
}
}
} }
} }
} }
return broker; return broker;
} }
@Override
public TransportServer doBind(URI location) throws IOException { public TransportServer doBind(URI location) throws IOException {
return bind(location, false); return bind(location, false);
} }

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -27,20 +28,34 @@ import java.util.concurrent.TimeUnit;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.mortbay.log.Log;
public class VMTransportWaitForTest { public class VMTransportWaitForTest {
private static final int WAIT_TIME = 20000;
private static final int SHORT_WAIT_TIME = 5000;
private static final String VM_BROKER_URI_NO_WAIT = private static final String VM_BROKER_URI_NO_WAIT =
"vm://localhost?broker.persistent=false&create=false"; "vm://localhost?broker.persistent=false&create=false";
private static final String VM_BROKER_URI_WAIT_FOR_START = private static final String VM_BROKER_URI_WAIT_FOR_START =
VM_BROKER_URI_NO_WAIT + "&waitForStart=20000"; VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME;
private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START =
VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME;
CountDownLatch started = new CountDownLatch(1); CountDownLatch started = new CountDownLatch(1);
CountDownLatch gotConnection = new CountDownLatch(1); CountDownLatch gotConnection = new CountDownLatch(1);
@After
public void after() throws IOException {
BrokerRegistry.getInstance().unbind("localhost");
}
@Test(timeout=90000) @Test(timeout=90000)
public void testWaitFor() throws Exception { public void testWaitFor() throws Exception {
try { try {
@ -77,4 +92,44 @@ public class VMTransportWaitForTest {
assertTrue("has got connection", gotConnection.await(400, TimeUnit.MILLISECONDS)); assertTrue("has got connection", gotConnection.await(400, TimeUnit.MILLISECONDS));
broker.stop(); broker.stop();
} }
@Test(timeout=90000)
public void testWaitForNoBrokerInRegistry() throws Exception {
long startTime = System.currentTimeMillis();
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
cf.createConnection();
fail("expect broker not exist exception");
} catch (JMSException expectedOnNoBrokerAndNoCreate) {
}
long endTime = System.currentTimeMillis();
Log.info("Total wait time was: {}", endTime - startTime);
assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
}
@Test(timeout=90000)
public void testWaitForNotStartedButInRegistry() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
BrokerRegistry.getInstance().bind("localhost", broker);
long startTime = System.currentTimeMillis();
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
cf.createConnection();
fail("expect broker not exist exception");
} catch (JMSException expectedOnNoBrokerAndNoCreate) {
}
long endTime = System.currentTimeMillis();
Log.info("Total wait time was: {}", endTime - startTime);
assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
}
} }