diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index b3ec0aed5d..813f9fdfb9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -25,8 +25,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,7 +73,7 @@ public class AmqpConnection extends AmqpAbstractResource implements public static final long DEFAULT_CLOSE_TIMEOUT = 30000; public static final long DEFAULT_DRAIN_TIMEOUT = 60000; - private final ScheduledExecutorService serializer; + private ScheduledThreadPoolExecutor serializer; private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicLong sessionIdGenerator = new AtomicLong(); @@ -116,7 +116,7 @@ public class AmqpConnection extends AmqpAbstractResource implements this.connectionId = CONNECTION_ID_GENERATOR.generateId(); this.remoteURI = transport.getRemoteLocation(); - this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + this.serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable runner) { @@ -127,6 +127,10 @@ public class AmqpConnection extends AmqpAbstractResource implements } }); + // Ensure timely shutdown + this.serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.transport.setTransportListener(this); }