diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 9f55cb4951..92db402bf8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -70,7 +70,6 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; @@ -93,7 +92,7 @@ import org.slf4j.LoggerFactory; */ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); - private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory(); + private TaskRunnerFactory asyncTaskRunner; protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; protected final Transport localBroker; protected final Transport remoteBroker; @@ -157,6 +156,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public void start() throws Exception { if (started.compareAndSet(false, true)) { + asyncTaskRunner = new TaskRunnerFactory("ActiveMQ ForwardingBridge Task"); + asyncTaskRunner.init(); + localBroker.setTransportListener(new DefaultTransportListener() { @Override @@ -374,6 +376,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br startedLatch.countDown(); startedLatch.countDown(); localStartedLatch.countDown(); + + // stop task runner + asyncTaskRunner.shutdown(); + asyncTaskRunner = null; ss.throwFirstException(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java b/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java index 08b31d15a0..e03eef60f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java @@ -16,35 +16,18 @@ */ package org.apache.activemq.thread; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; - /** - * - * + * @deprecated do not use this class. Instead use {@link TaskRunnerFactory} */ +@Deprecated public final class DefaultThreadPools { -// private static final Executor DEFAULT_POOL; -// static { -// DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { -// public Thread newThread(Runnable runnable) { -// Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread"); -// thread.setDaemon(true); -// return thread; -// } -// }); -// } private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory(); private DefaultThreadPools() { } -// public static Executor getDefaultPool() { -// return DEFAULT_POOL; -// } - + @Deprecated public static TaskRunnerFactory getDefaultTaskRunnerFactory() { return DEFAULT_TASK_RUNNER_FACTORY; } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 24d23fa6f5..3f76a91c19 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -49,7 +49,11 @@ public class TaskRunnerFactory implements Executor { private RejectedExecutionHandler rejectedTaskHandler = null; public TaskRunnerFactory() { - this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); + this("ActiveMQ Task"); + } + + public TaskRunnerFactory(String name) { + this(name, Thread.NORM_PRIORITY, true, 1000); } private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { @@ -98,7 +102,7 @@ public class TaskRunnerFactory implements Executor { } public void execute(Runnable runnable) { - execute(runnable, "ActiveMQ Task"); + execute(runnable, name); } public void execute(Runnable runnable, String name) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java index ad884ad5b8..316ce778eb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java @@ -21,7 +21,7 @@ import java.net.URI; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.DiscoveryEvent; -import org.apache.activemq.thread.DefaultThreadPools; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryListener; import org.slf4j.Logger; @@ -46,6 +46,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { private DiscoveryListener listener; private String services[] = new String[] {}; private final AtomicBoolean running = new AtomicBoolean(false); + private TaskRunnerFactory taskRunner; class SimpleDiscoveryEvent extends DiscoveryEvent { @@ -72,6 +73,9 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { } public void start() throws Exception { + taskRunner = new TaskRunnerFactory(); + taskRunner.init(); + running.set(true); for (int i = 0; i < services.length; i++) { listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); @@ -80,6 +84,11 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { public void stop() throws Exception { running.set(false); + + taskRunner.shutdown(); + + // TODO: Should we not remove the services on the listener? + synchronized (sleepMutex) { sleepMutex.notifyAll(); } @@ -110,7 +119,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { if (event.failed.compareAndSet(false, true)) { listener.onServiceRemove(event); - DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { + taskRunner.execute(new Runnable() { public void run() { // We detect a failed connection attempt because the service