fix intermittent failure of FailoverStaticNetworkTest. Bridge fails to start triggerStartAsyncNetworkBridgeCreation thread waiting on localBrokerInfo. The command was dropped due to contention between dispatch and peer start. Fix and test. Relates to https://issues.apache.org/jira/browse/AMQ-3684

This commit is contained in:
gtully 2015-05-12 22:10:26 +01:00
parent 3ef5389691
commit c89bb7a316
3 changed files with 61 additions and 10 deletions

View File

@ -92,12 +92,24 @@ public class VMTransport implements Transport, Task {
throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
}
if (peer.async || !peer.started.get()) {
if (peer.async) {
peer.getMessageQueue().put(command);
peer.wakeup();
return;
}
if (!peer.started.get()) {
LinkedBlockingQueue<Object> pending = peer.getMessageQueue();
boolean accepted = false;
do {
synchronized (peer.started) {
accepted = pending.offer(command);
}
} while (!accepted && !peer.started.get());
if (accepted) {
return;
}
}
} catch (InterruptedException e) {
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
iioe.initCause(e);
@ -259,14 +271,6 @@ public class VMTransport implements Transport, Task {
this.transportListener = commandListener;
}
public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
synchronized (this) {
if (messageQueue == null) {
messageQueue = asyncQueue;
}
}
}
public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
LinkedBlockingQueue<Object> result = messageQueue;
if (result == null) {

View File

@ -583,7 +583,6 @@
<exclude>**/amq1490/*</exclude>
<exclude>**/archive/*</exclude>
<exclude>**/NetworkFailoverTest.*/**</exclude>
<exclude>**/vm/VMTransportBrokerTest.*</exclude>
<exclude>**/broker/MarshallingBrokerTest.*</exclude>
<exclude>**/AMQDeadlockTest3.*</exclude>
<!-- https://issues.apache.org/activemq/browse/AMQ-2050 -->

View File

@ -16,7 +16,10 @@
*/
package org.apache.activemq.transport.vm;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@ -25,6 +28,10 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
public class VMTransportBrokerNameTest extends TestCase {
@ -47,4 +54,45 @@ public class VMTransportBrokerNameTest extends TestCase {
c1.close();
c2.close();
}
public void testBrokerInfoClientAsync() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl));
ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER));
for (int i=0;i<20; i++) {
final CountDownLatch gotBrokerInfo = new CountDownLatch(1);
Transport transport = TransportFactory.connect(new URI("vm://" + MY_BROKER + "?async=false"));
transport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
if (command instanceof BrokerInfo) {
gotBrokerInfo.countDown();
}
}
@Override
public void onException(IOException error) {
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
transport.start();
assertTrue("got broker info on iteration:" + i, gotBrokerInfo.await(5, TimeUnit.SECONDS));
transport.stop();
}
c1.close();
}
}