diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index ec1a423cac..e7aa72979b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -92,12 +92,24 @@ public class VMTransport implements Transport, Task { throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); } - if (peer.async || !peer.started.get()) { + if (peer.async) { peer.getMessageQueue().put(command); peer.wakeup(); return; } + if (!peer.started.get()) { + LinkedBlockingQueue pending = peer.getMessageQueue(); + boolean accepted = false; + do { + synchronized (peer.started) { + accepted = pending.offer(command); + } + } while (!accepted && !peer.started.get()); + if (accepted) { + return; + } + } } catch (InterruptedException e) { InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); iioe.initCause(e); @@ -259,14 +271,6 @@ public class VMTransport implements Transport, Task { this.transportListener = commandListener; } - public void setMessageQueue(LinkedBlockingQueue asyncQueue) { - synchronized (this) { - if (messageQueue == null) { - messageQueue = asyncQueue; - } - } - } - public LinkedBlockingQueue getMessageQueue() throws TransportDisposedIOException { LinkedBlockingQueue result = messageQueue; if (result == null) { diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 3ca9b1a0da..7ea4ce3263 100755 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -583,7 +583,6 @@ **/amq1490/* **/archive/* **/NetworkFailoverTest.*/** - **/vm/VMTransportBrokerTest.* **/broker/MarshallingBrokerTest.* **/AMQDeadlockTest3.* diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java index ac0539317a..5183db8e92 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.transport.vm; +import java.io.IOException; import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -25,6 +28,10 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; public class VMTransportBrokerNameTest extends TestCase { @@ -47,4 +54,45 @@ public class VMTransportBrokerNameTest extends TestCase { c1.close(); c2.close(); } + + public void testBrokerInfoClientAsync() throws Exception { + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl)); + ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection(); + assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER)); + + for (int i=0;i<20; i++) { + final CountDownLatch gotBrokerInfo = new CountDownLatch(1); + Transport transport = TransportFactory.connect(new URI("vm://" + MY_BROKER + "?async=false")); + transport.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + if (command instanceof BrokerInfo) { + gotBrokerInfo.countDown(); + } + } + + @Override + public void onException(IOException error) { + + } + + @Override + public void transportInterupted() { + + } + + @Override + public void transportResumed() { + + } + }); + transport.start(); + + assertTrue("got broker info on iteration:" + i, gotBrokerInfo.await(5, TimeUnit.SECONDS)); + + transport.stop(); + } + c1.close(); + } }