From 4f7c9ec811c10cf22bc0e93a1a7b049cd5f3fd07 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 11 May 2017 15:59:08 -0400 Subject: [PATCH] AMQ-6674 Ensure timely shutdown of the connection executor Don't wait for next idle check or other scheduled tasks to run before shutdown can proceed. --- .../activemq/transport/amqp/client/AmqpConnection.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index b3ec0aed5d..813f9fdfb9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -25,8 +25,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,7 +73,7 @@ public class AmqpConnection extends AmqpAbstractResource implements public static final long DEFAULT_CLOSE_TIMEOUT = 30000; public static final long DEFAULT_DRAIN_TIMEOUT = 60000; - private final ScheduledExecutorService serializer; + private ScheduledThreadPoolExecutor serializer; private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicLong sessionIdGenerator = new AtomicLong(); @@ -116,7 +116,7 @@ public class AmqpConnection extends AmqpAbstractResource implements this.connectionId = CONNECTION_ID_GENERATOR.generateId(); this.remoteURI = transport.getRemoteLocation(); - this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + this.serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable runner) { @@ -127,6 +127,10 @@ public class AmqpConnection extends AmqpAbstractResource implements } }); + // Ensure timely shutdown + this.serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.transport.setTransportListener(this); }