AMQ-6494 is related, fix intermittent failure of RedeliveryPolicyTest related to vm transport server being shutdown while in use via async onException handler

This commit is contained in:
gtully 2019-10-03 11:08:05 +01:00
parent 1ab6793c85
commit 4af6f40186
2 changed files with 45 additions and 3 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.BrokerInfo;
@ -35,7 +36,7 @@ public class VMTransportServer implements TransportServer {
private TransportAcceptListener acceptListener;
private final URI location;
private boolean disposed;
private AtomicBoolean disposed = new AtomicBoolean(false);
private final AtomicInteger connectionCount = new AtomicInteger(0);
private final boolean disposeOnDisconnect;
@ -64,7 +65,7 @@ public class VMTransportServer implements TransportServer {
public VMTransport connect() throws IOException {
TransportAcceptListener al;
synchronized (this) {
if (disposed) {
if (disposed.get()) {
throw new IOException("Server has been disposed.");
}
al = acceptListener;
@ -117,7 +118,9 @@ public class VMTransportServer implements TransportServer {
}
public void stop() throws IOException {
VMTransportFactory.stopped(this);
if (disposed.compareAndSet(false, true)) {
VMTransportFactory.stopped(this);
}
}
public URI getConnectURI() {

View File

@ -600,6 +600,45 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
public void testRepeatedServerClose() throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);
// Send the messages
producer.send(session.createTextMessage("1st"));
session.commit();
final int maxRedeliveries = 10000;
for (int i=0;i<=maxRedeliveries + 1;i++) {
final ActiveMQConnection toTest = (ActiveMQConnection)factory.createConnection(userName, password);
toTest.start();
// abortive close via broker
for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
transportServer.stop();
}
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return toTest.isTransportFailed();
}
},10000, 100 );
try {
toTest.close();
} catch (Exception expected) {
} finally {
}
}
}
public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
connection.start();