diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index e8122d08f3..80597e3706 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -35,6 +35,8 @@ import javax.jms.TopicSession; import java.lang.ref.WeakReference; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -48,6 +50,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.VersionLoader; @@ -113,6 +116,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this); + private final ExecutorService failoverListenerExecutor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()); + private final Version thisVersion; private final int dupsOKBatchSize; @@ -353,6 +358,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme } } + failoverListenerExecutor.shutdown(); + closed = true; } catch (ActiveMQException e) { @@ -759,12 +766,12 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme if (failoverListener != null) { - new Thread(new Runnable() { + conn.failoverListenerExecutor.execute(new Runnable() { @Override public void run() { failoverListener.failoverEvent(eventType); } - }).start(); + }); } } catch (JMSException e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java index ed5dcae095..5cb23b3ef2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java @@ -56,7 +56,6 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -312,7 +311,7 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase { private static class MyFailoverListener implements FailoverEventListener { - private List eventTypeList = Collections.synchronizedList(new ArrayList()); + private List eventTypeList = new ArrayList<>(); public FailoverEventType get(int element) { waitForElements(element + 1);