From 4af6f4018656e01989136042a8ad4ec1ab64c137 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 3 Oct 2019 11:08:05 +0100 Subject: [PATCH] AMQ-6494 is related, fix intermittent failure of RedeliveryPolicyTest related to vm transport server being shutdown while in use via async onException handler --- .../transport/vm/VMTransportServer.java | 9 +++-- .../apache/activemq/RedeliveryPolicyTest.java | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java index 2f3d5191f7..8bef1cc46b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.BrokerInfo; @@ -35,7 +36,7 @@ public class VMTransportServer implements TransportServer { private TransportAcceptListener acceptListener; private final URI location; - private boolean disposed; + private AtomicBoolean disposed = new AtomicBoolean(false); private final AtomicInteger connectionCount = new AtomicInteger(0); private final boolean disposeOnDisconnect; @@ -64,7 +65,7 @@ public class VMTransportServer implements TransportServer { public VMTransport connect() throws IOException { TransportAcceptListener al; synchronized (this) { - if (disposed) { + if (disposed.get()) { throw new IOException("Server has been disposed."); } al = acceptListener; @@ -117,7 +118,9 @@ public class VMTransportServer implements TransportServer { } public void stop() throws IOException { - VMTransportFactory.stopped(this); + if (disposed.compareAndSet(false, true)) { + VMTransportFactory.stopped(this); + } } public URI getConnectURI() { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index 5af3a37cf8..a0a1ca8e5d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -600,6 +600,45 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } + public void testRepeatedServerClose() throws Exception { + + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + session.commit(); + + final int maxRedeliveries = 10000; + for (int i=0;i<=maxRedeliveries + 1;i++) { + + final ActiveMQConnection toTest = (ActiveMQConnection)factory.createConnection(userName, password); + toTest.start(); + + // abortive close via broker + for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) { + transportServer.stop(); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return toTest.isTransportFailed(); + } + },10000, 100 ); + + try { + toTest.close(); + } catch (Exception expected) { + } finally { + } + } + } + + + public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception { connection.start();