This closes #1620
This commit is contained in:
commit
60ce40f3a8
|
@ -188,6 +188,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
|
|||
return this;
|
||||
}
|
||||
|
||||
public synchronized ActiveMQScheduledComponent setPeriod(long period, TimeUnit unit) {
|
||||
this.period = period;
|
||||
this.timeUnit = unit;
|
||||
restartIfNeeded();
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getInitialDelay() {
|
||||
return initialDelay;
|
||||
}
|
||||
|
|
|
@ -19,9 +19,10 @@ package org.apache.activemq.artemis.utils.critical;
|
|||
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
|
@ -31,7 +32,29 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
|
||||
private volatile long timeoutNanoSeconds;
|
||||
|
||||
private volatile long checkTimeNanoSeconds;
|
||||
// one minute by default.. the server will change it for sure
|
||||
private volatile long checkTimeNanoSeconds = TimeUnit.SECONDS.toNanos(60);
|
||||
|
||||
private final ActiveMQScheduledComponent scheduledComponent;
|
||||
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
public CriticalAnalyzerImpl() {
|
||||
// this will make the scheduled component to start its own pool
|
||||
|
||||
/* Important: The scheduled component should have its own thread pool...
|
||||
* otherwise in case of a deadlock, or a starvation of the server the analyzer won't pick up any
|
||||
* issues and won't be able to shutdown the server or halt the VM
|
||||
*/
|
||||
this.scheduledComponent = new ActiveMQScheduledComponent(null, null, checkTimeNanoSeconds, TimeUnit.NANOSECONDS, false) {
|
||||
@Override
|
||||
public void run() {
|
||||
logger.trace("Checking critical analyzer");
|
||||
check();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
|
@ -41,10 +64,6 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
|
||||
private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>();
|
||||
|
||||
private Thread thread;
|
||||
|
||||
private final Semaphore running = new Semaphore(1);
|
||||
|
||||
private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
|
||||
|
||||
@Override
|
||||
|
@ -65,6 +84,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
@Override
|
||||
public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
|
||||
this.checkTimeNanoSeconds = unit.toNanos(timeout);
|
||||
this.scheduledComponent.setPeriod(timeout, unit);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -78,6 +98,9 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
|
||||
@Override
|
||||
public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
|
||||
if (checkTimeNanoSeconds <= 0) {
|
||||
this.setCheckTime(timeout / 2, unit);
|
||||
}
|
||||
this.timeoutNanoSeconds = unit.toNanos(timeout);
|
||||
return this;
|
||||
}
|
||||
|
@ -117,7 +140,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
}
|
||||
|
||||
private void fireAction(CriticalComponent component) {
|
||||
for (CriticalAction action: actions) {
|
||||
for (CriticalAction action : actions) {
|
||||
try {
|
||||
action.run(component);
|
||||
} catch (Throwable e) {
|
||||
|
@ -128,59 +151,17 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
|
||||
@Override
|
||||
public void start() {
|
||||
scheduledComponent.start();
|
||||
|
||||
if (!running.tryAcquire()) {
|
||||
// already running
|
||||
return;
|
||||
}
|
||||
|
||||
// we are not using any Thread Pool or any Scheduled Executors from the ArtemisServer
|
||||
// as that would defeat the purpose,
|
||||
// as in any deadlocks the schedulers may be starving for something not responding fast enough
|
||||
thread = new Thread("Artemis Critical Analyzer") {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
if (running.tryAcquire(getCheckTimeNanoSeconds(), TimeUnit.NANOSECONDS)) {
|
||||
running.release();
|
||||
// this means that the server has been stopped as we could acquire the semaphore... returning now
|
||||
break;
|
||||
}
|
||||
logger.trace("Checking critical analyzer");
|
||||
check();
|
||||
}
|
||||
} catch (InterruptedException interrupted) {
|
||||
// i will just leave on that case
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
thread.setDaemon(true);
|
||||
|
||||
thread.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (!isStarted()) {
|
||||
// already stopped, leaving
|
||||
return;
|
||||
}
|
||||
|
||||
running.release();
|
||||
|
||||
try {
|
||||
if (thread != null) {
|
||||
thread.join();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
scheduledComponent.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStarted() {
|
||||
return running.availablePermits() == 0;
|
||||
return scheduledComponent.isStarted();
|
||||
}
|
||||
}
|
|
@ -507,6 +507,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
configuration.parseSystemProperties();
|
||||
|
||||
initializeExecutorServices();
|
||||
|
||||
initializeCriticalAnalyzer();
|
||||
|
||||
startDate = new Date();
|
||||
|
@ -575,13 +577,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
private void initializeCriticalAnalyzer() throws Exception {
|
||||
if (analyzer == null) {
|
||||
if (configuration.isCriticalAnalyzer()) {
|
||||
// this will have its own ScheduledPool
|
||||
this.analyzer = new CriticalAnalyzerImpl();
|
||||
} else {
|
||||
this.analyzer = EmptyCriticalAnalyzer.getInstance();
|
||||
}
|
||||
}
|
||||
|
||||
/** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
|
||||
/* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
|
||||
this.analyzer.clear();
|
||||
|
||||
this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
|
||||
|
@ -1181,9 +1184,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
try {
|
||||
this.getCriticalAnalyzer().stop();
|
||||
this.analyzer.stop();
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
} finally {
|
||||
this.analyzer = null;
|
||||
}
|
||||
|
||||
if (identity != null) {
|
||||
|
@ -2257,9 +2262,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
if (state == SERVER_STATE.STOPPED)
|
||||
return false;
|
||||
|
||||
// Create the pools - we have two pools - one for non scheduled - and another for scheduled
|
||||
initializeExecutorServices();
|
||||
|
||||
if (configuration.getJournalType() == JournalType.ASYNCIO) {
|
||||
if (!AIOSequentialFileFactory.isSupported()) {
|
||||
ActiveMQServerLogger.LOGGER.switchingNIO();
|
||||
|
|
Loading…
Reference in New Issue