diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index e7aa72979b..d90473b2db 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -100,11 +100,23 @@ public class VMTransport implements Transport, Task { if (!peer.started.get()) { LinkedBlockingQueue pending = peer.getMessageQueue(); + int sleepTimeMillis; boolean accepted = false; do { + sleepTimeMillis = 0; + // the pending queue is drained on start so we need to ensure we add before + // the drain commences, otherwise we never get the command dispatched! synchronized (peer.started) { - accepted = pending.offer(command); + if (!peer.started.get()) { + accepted = pending.offer(command); + if (!accepted) { + sleepTimeMillis = 500; + } + } } + // give start thread a chance if we will loop + TimeUnit.MILLISECONDS.sleep(sleepTimeMillis); + } while (!accepted && !peer.started.get()); if (accepted) { return; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java index 5183db8e92..50a81f66a7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java @@ -19,12 +19,11 @@ package org.apache.activemq.transport.vm; import java.io.IOException; import java.net.URI; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; - import javax.jms.Connection; - -import junit.framework.TestCase; - +import org.junit.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerRegistry; @@ -33,66 +32,92 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; -public class VMTransportBrokerNameTest extends TestCase { +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class VMTransportBrokerNameTest { private static final String MY_BROKER = "myBroker"; final String vmUrl = "vm:(broker:(tcp://localhost:61616)/" + MY_BROKER + "?persistent=false)"; + @Test public void testBrokerName() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl)); ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection(); assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER)); - + // verify Broker is there with name ActiveMQConnectionFactory cfbyName = new ActiveMQConnectionFactory(new URI("vm://" + MY_BROKER + "?create=false")); Connection c2 = cfbyName.createConnection(); - + assertNotNull(BrokerRegistry.getInstance().lookup(MY_BROKER)); assertEquals(BrokerRegistry.getInstance().findFirst().getBrokerName(), MY_BROKER); assertEquals(BrokerRegistry.getInstance().getBrokers().size(), 1); - + c1.close(); c2.close(); } - public void testBrokerInfoClientAsync() throws Exception { + @Test + public void testBrokerInfoReceiptClientAsync() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl)); ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection(); - assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER)); - for (int i=0;i<20; i++) { - final CountDownLatch gotBrokerInfo = new CountDownLatch(1); - Transport transport = TransportFactory.connect(new URI("vm://" + MY_BROKER + "?async=false")); - transport.setTransportListener(new TransportListener() { + final int numIterations = 400; + final CountDownLatch successLatch = new CountDownLatch(numIterations); + ExecutorService executor = Executors.newFixedThreadPool(100); + for (int i = 0; i < numIterations; i++) { + executor.submit(new Runnable() { @Override - public void onCommand(Object command) { - if (command instanceof BrokerInfo) { - gotBrokerInfo.countDown(); + public void run() { + try { + verifyBrokerInfo(successLatch); + } catch (Exception ignored) { + ignored.printStackTrace(); } } - - @Override - public void onException(IOException error) { - - } - - @Override - public void transportInterupted() { - - } - - @Override - public void transportResumed() { - - } }); - transport.start(); - - assertTrue("got broker info on iteration:" + i, gotBrokerInfo.await(5, TimeUnit.SECONDS)); - - transport.stop(); } + + executor.shutdown(); + executor.awaitTermination(20, TimeUnit.SECONDS); c1.close(); + + assertTrue("all success: " + successLatch.getCount(), successLatch.await(1, TimeUnit.SECONDS)); + } + + public void verifyBrokerInfo(CountDownLatch success) throws Exception { + final CountDownLatch gotBrokerInfo = new CountDownLatch(1); + Transport transport = TransportFactory.connect(new URI("vm://" + MY_BROKER + "?async=false")); + transport.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + if (command instanceof BrokerInfo) { + gotBrokerInfo.countDown(); + } + } + + @Override + public void onException(IOException error) { + + } + + @Override + public void transportInterupted() { + + } + + @Override + public void transportResumed() { + + } + }); + transport.start(); + if (gotBrokerInfo.await(5, TimeUnit.SECONDS)) { + success.countDown(); + } + transport.stop(); } }