diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 61b46a0145..c8a6966ed9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -27,13 +27,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueInfo; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -112,8 +113,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private Reaper reaperRunnable; - private volatile Thread reaperThread; - private final long reaperPeriod; private final int reaperPriority; @@ -198,12 +197,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (reaperRunnable != null) reaperRunnable.stop(); - if (reaperThread != null) { - reaperThread.join(); - - reaperThread = null; - } - addressManager.clear(); queueInfos.clear(); @@ -1244,12 +1237,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (reaperPeriod > 0) { if (reaperRunnable != null) reaperRunnable.stop(); - reaperRunnable = new Reaper(); - reaperThread = new Thread(reaperRunnable, "activemq-expiry-reaper-thread"); + reaperRunnable = new Reaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), reaperPeriod, TimeUnit.MILLISECONDS, false); - reaperThread.setPriority(reaperPriority); - - reaperThread.start(); + reaperRunnable.start(); } } @@ -1268,48 +1258,38 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return message; } - private final class Reaper implements Runnable { + private final class Reaper extends ActiveMQScheduledComponent { - private final CountDownLatch latch = new CountDownLatch(1); - - public void stop() { - latch.countDown(); + Reaper(ScheduledExecutorService scheduledExecutorService, + Executor executor, + long checkPeriod, + TimeUnit timeUnit, + boolean onDemand) { + super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand); } @Override public void run() { // The reaper thread should be finished case the PostOffice is gone // This is to avoid leaks on PostOffice between stops and starts - while (isStarted()) { + Map<SimpleString, Binding> nameMap = addressManager.getBindings(); + + List<Queue> queues = new ArrayList<>(); + + for (Binding binding : nameMap.values()) { + if (binding.getType() == BindingType.LOCAL_QUEUE) { + Queue queue = (Queue) binding.getBindable(); + + queues.add(queue); + } + } + + for (Queue queue : queues) { try { - if (latch.await(reaperPeriod, TimeUnit.MILLISECONDS)) - return; + queue.expireReferences(); } - catch (InterruptedException e1) { - throw new ActiveMQInterruptedException(e1); - } - if (!isStarted()) - return; - - Map<SimpleString, Binding> nameMap = addressManager.getBindings(); - - List<Queue> queues = new ArrayList<>(); - - for (Binding binding : nameMap.values()) { - if (binding.getType() == BindingType.LOCAL_QUEUE) { - Queue queue = (Queue) binding.getBindable(); - - queues.add(queue); - } - } - - for (Queue queue : queues) { - try { - queue.expireReferences(); - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorExpiringMessages(e); - } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorExpiringMessages(e); } } }