This closes #793
This commit is contained in:
commit
42c7080941
|
@ -20,13 +20,14 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
|
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
|
||||||
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
|
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A MessageCounterManager
|
* A MessageCounterManager
|
||||||
|
@ -41,45 +42,25 @@ public class MessageCounterManagerImpl implements MessageCounterManager {
|
||||||
|
|
||||||
private final Map<String, MessageCounter> messageCounters;
|
private final Map<String, MessageCounter> messageCounters;
|
||||||
|
|
||||||
private boolean started;
|
private final MessageCountersPinger messageCountersPinger;
|
||||||
|
|
||||||
private long period = MessageCounterManagerImpl.DEFAULT_SAMPLE_PERIOD;
|
|
||||||
|
|
||||||
private MessageCountersPinger messageCountersPinger;
|
|
||||||
|
|
||||||
private int maxDayCount = MessageCounterManagerImpl.DEFAULT_MAX_DAY_COUNT;
|
private int maxDayCount = MessageCounterManagerImpl.DEFAULT_MAX_DAY_COUNT;
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduledThreadPool;
|
public MessageCounterManagerImpl(final ScheduledExecutorService scheduledThreadPool, Executor executor) {
|
||||||
|
|
||||||
public MessageCounterManagerImpl(final ScheduledExecutorService scheduledThreadPool) {
|
|
||||||
messageCounters = new HashMap<>();
|
messageCounters = new HashMap<>();
|
||||||
|
messageCountersPinger = new MessageCountersPinger(scheduledThreadPool, executor, MessageCounterManagerImpl.DEFAULT_SAMPLE_PERIOD, TimeUnit.MILLISECONDS, false);
|
||||||
this.scheduledThreadPool = scheduledThreadPool;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
if (started) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
messageCountersPinger = new MessageCountersPinger();
|
messageCountersPinger.start();
|
||||||
|
|
||||||
Future<?> future = scheduledThreadPool.scheduleAtFixedRate(messageCountersPinger, 0, period, TimeUnit.MILLISECONDS);
|
|
||||||
messageCountersPinger.setFuture(future);
|
|
||||||
|
|
||||||
started = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void stop() {
|
public synchronized void stop() {
|
||||||
if (!started) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
messageCountersPinger.stop();
|
messageCountersPinger.stop();
|
||||||
|
|
||||||
started = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -89,22 +70,12 @@ public class MessageCounterManagerImpl implements MessageCounterManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void reschedule(final long newPeriod) {
|
public synchronized void reschedule(final long newPeriod) {
|
||||||
boolean wasStarted = started;
|
messageCountersPinger.setPeriod(newPeriod);
|
||||||
|
|
||||||
if (wasStarted) {
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
period = newPeriod;
|
|
||||||
|
|
||||||
if (wasStarted) {
|
|
||||||
start();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getSamplePeriod() {
|
public long getSamplePeriod() {
|
||||||
return period;
|
return messageCountersPinger.getPeriod();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -155,17 +126,18 @@ public class MessageCounterManagerImpl implements MessageCounterManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MessageCountersPinger implements Runnable {
|
private class MessageCountersPinger extends ActiveMQScheduledComponent {
|
||||||
|
|
||||||
private boolean closed = false;
|
MessageCountersPinger(ScheduledExecutorService scheduledExecutorService,
|
||||||
|
Executor executor,
|
||||||
private Future<?> future;
|
long checkPeriod,
|
||||||
|
TimeUnit timeUnit,
|
||||||
|
boolean onDemand) {
|
||||||
|
super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void run() {
|
public void run() {
|
||||||
if (closed) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (messageCounters) {
|
synchronized (messageCounters) {
|
||||||
for (MessageCounter counter : messageCounters.values()) {
|
for (MessageCounter counter : messageCounters.values()) {
|
||||||
|
@ -174,17 +146,6 @@ public class MessageCounterManagerImpl implements MessageCounterManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setFuture(final Future<?> future) {
|
|
||||||
this.future = future;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void stop() {
|
|
||||||
if (future != null) {
|
|
||||||
future.cancel(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
closed = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class ManagementServiceImpl implements ManagementService {
|
||||||
this.messagingServer = messagingServer;
|
this.messagingServer = messagingServer;
|
||||||
this.pagingManager = pagingManager;
|
this.pagingManager = pagingManager;
|
||||||
|
|
||||||
messageCounterManager = new MessageCounterManagerImpl(scheduledThreadPool);
|
messageCounterManager = new MessageCounterManagerImpl(scheduledThreadPool, messagingServer.getExecutorFactory().getExecutor());
|
||||||
messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
|
messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
|
||||||
messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
|
messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue