ARTEMIS-734 small improvement: use ActiveMQScheduledComponent

This commit is contained in:
Clebert Suconic 2016-09-14 18:51:13 -04:00 committed by Martyn Taylor
parent 2509eeb818
commit 6b5fff40cb
1 changed files with 28 additions and 48 deletions

View File

@ -27,13 +27,13 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueInfo;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -112,8 +113,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
private Reaper reaperRunnable;
private volatile Thread reaperThread;
private final long reaperPeriod;
private final int reaperPriority;
@ -198,12 +197,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (reaperRunnable != null)
reaperRunnable.stop();
if (reaperThread != null) {
reaperThread.join();
reaperThread = null;
}
addressManager.clear();
queueInfos.clear();
@ -1244,12 +1237,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (reaperPeriod > 0) {
if (reaperRunnable != null)
reaperRunnable.stop();
reaperRunnable = new Reaper();
reaperThread = new Thread(reaperRunnable, "activemq-expiry-reaper-thread");
reaperRunnable = new Reaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), reaperPeriod, TimeUnit.MILLISECONDS, false);
reaperThread.setPriority(reaperPriority);
reaperThread.start();
reaperRunnable.start();
}
}
@ -1268,48 +1258,38 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return message;
}
private final class Reaper implements Runnable {
private final class Reaper extends ActiveMQScheduledComponent {
private final CountDownLatch latch = new CountDownLatch(1);
public void stop() {
latch.countDown();
Reaper(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
}
@Override
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
while (isStarted()) {
Map<SimpleString, Binding> nameMap = addressManager.getBindings();
List<Queue> queues = new ArrayList<>();
for (Binding binding : nameMap.values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
queues.add(queue);
}
}
for (Queue queue : queues) {
try {
if (latch.await(reaperPeriod, TimeUnit.MILLISECONDS))
return;
queue.expireReferences();
}
catch (InterruptedException e1) {
throw new ActiveMQInterruptedException(e1);
}
if (!isStarted())
return;
Map<SimpleString, Binding> nameMap = addressManager.getBindings();
List<Queue> queues = new ArrayList<>();
for (Binding binding : nameMap.values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
queues.add(queue);
}
}
for (Queue queue : queues) {
try {
queue.expireReferences();
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
}
}
}