ARTEMIS-1447 Reuse thread pools on JDBC Locks
I had moved the initialization of the thread pools for that.. I also have changed the CrticialAnalyzer to use the same pools as part of this
This commit is contained in:
parent
6cc786ff15
commit
1af6d986a5
|
@ -188,6 +188,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized ActiveMQScheduledComponent setPeriod(long period, TimeUnit unit) {
|
||||||
|
this.period = period;
|
||||||
|
this.timeUnit = unit;
|
||||||
|
restartIfNeeded();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public long getInitialDelay() {
|
public long getInitialDelay() {
|
||||||
return initialDelay;
|
return initialDelay;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,10 @@ package org.apache.activemq.artemis.utils.critical;
|
||||||
|
|
||||||
import java.util.ConcurrentModificationException;
|
import java.util.ConcurrentModificationException;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.Semaphore;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
@ -31,7 +32,29 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
||||||
|
|
||||||
private volatile long timeoutNanoSeconds;
|
private volatile long timeoutNanoSeconds;
|
||||||
|
|
||||||
private volatile long checkTimeNanoSeconds;
|
// one minute by default.. the server will change it for sure
|
||||||
|
private volatile long checkTimeNanoSeconds = TimeUnit.SECONDS.toNanos(60);
|
||||||
|
|
||||||
|
private final ActiveMQScheduledComponent scheduledComponent;
|
||||||
|
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
public CriticalAnalyzerImpl() {
|
||||||
|
// this will make the scheduled component to start its own pool
|
||||||
|
|
||||||
|
/* Important: The scheduled component should have its own thread pool...
|
||||||
|
* otherwise in case of a deadlock, or a starvation of the server the analyzer won't pick up any
|
||||||
|
* issues and won't be able to shutdown the server or halt the VM
|
||||||
|
*/
|
||||||
|
this.scheduledComponent = new ActiveMQScheduledComponent(null, null, checkTimeNanoSeconds, TimeUnit.NANOSECONDS, false) {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
logger.trace("Checking critical analyzer");
|
||||||
|
check();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
|
@ -41,10 +64,6 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
||||||
|
|
||||||
private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>();
|
private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
private Thread thread;
|
|
||||||
|
|
||||||
private final Semaphore running = new Semaphore(1);
|
|
||||||
|
|
||||||
private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
|
private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -65,6 +84,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
||||||
@Override
|
@Override
|
||||||
public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
|
public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
|
||||||
this.checkTimeNanoSeconds = unit.toNanos(timeout);
|
this.checkTimeNanoSeconds = unit.toNanos(timeout);
|
||||||
|
this.scheduledComponent.setPeriod(timeout, unit);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,6 +98,9 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
|
public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
|
||||||
|
if (checkTimeNanoSeconds <= 0) {
|
||||||
|
this.setCheckTime(timeout / 2, unit);
|
||||||
|
}
|
||||||
this.timeoutNanoSeconds = unit.toNanos(timeout);
|
this.timeoutNanoSeconds = unit.toNanos(timeout);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -117,7 +140,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fireAction(CriticalComponent component) {
|
private void fireAction(CriticalComponent component) {
|
||||||
for (CriticalAction action: actions) {
|
for (CriticalAction action : actions) {
|
||||||
try {
|
try {
|
||||||
action.run(component);
|
action.run(component);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -128,59 +151,17 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
|
scheduledComponent.start();
|
||||||
|
|
||||||
if (!running.tryAcquire()) {
|
|
||||||
// already running
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// we are not using any Thread Pool or any Scheduled Executors from the ArtemisServer
|
|
||||||
// as that would defeat the purpose,
|
|
||||||
// as in any deadlocks the schedulers may be starving for something not responding fast enough
|
|
||||||
thread = new Thread("Artemis Critical Analyzer") {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
if (running.tryAcquire(getCheckTimeNanoSeconds(), TimeUnit.NANOSECONDS)) {
|
|
||||||
running.release();
|
|
||||||
// this means that the server has been stopped as we could acquire the semaphore... returning now
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
logger.trace("Checking critical analyzer");
|
|
||||||
check();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException interrupted) {
|
|
||||||
// i will just leave on that case
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
thread.setDaemon(true);
|
|
||||||
|
|
||||||
thread.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
if (!isStarted()) {
|
scheduledComponent.stop();
|
||||||
// already stopped, leaving
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
running.release();
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (thread != null) {
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
|
||||||
logger.warn(e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isStarted() {
|
public boolean isStarted() {
|
||||||
return running.availablePermits() == 0;
|
return scheduledComponent.isStarted();
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -507,6 +507,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
configuration.parseSystemProperties();
|
configuration.parseSystemProperties();
|
||||||
|
|
||||||
|
initializeExecutorServices();
|
||||||
|
|
||||||
initializeCriticalAnalyzer();
|
initializeCriticalAnalyzer();
|
||||||
|
|
||||||
startDate = new Date();
|
startDate = new Date();
|
||||||
|
@ -575,13 +577,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
private void initializeCriticalAnalyzer() throws Exception {
|
private void initializeCriticalAnalyzer() throws Exception {
|
||||||
if (analyzer == null) {
|
if (analyzer == null) {
|
||||||
if (configuration.isCriticalAnalyzer()) {
|
if (configuration.isCriticalAnalyzer()) {
|
||||||
|
// this will have its own ScheduledPool
|
||||||
this.analyzer = new CriticalAnalyzerImpl();
|
this.analyzer = new CriticalAnalyzerImpl();
|
||||||
} else {
|
} else {
|
||||||
this.analyzer = EmptyCriticalAnalyzer.getInstance();
|
this.analyzer = EmptyCriticalAnalyzer.getInstance();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
|
/* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
|
||||||
this.analyzer.clear();
|
this.analyzer.clear();
|
||||||
|
|
||||||
this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
|
this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
|
||||||
|
@ -1181,9 +1184,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.getCriticalAnalyzer().stop();
|
this.analyzer.stop();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn(e.getMessage(), e);
|
logger.warn(e.getMessage(), e);
|
||||||
|
} finally {
|
||||||
|
this.analyzer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (identity != null) {
|
if (identity != null) {
|
||||||
|
@ -2257,9 +2262,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
if (state == SERVER_STATE.STOPPED)
|
if (state == SERVER_STATE.STOPPED)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// Create the pools - we have two pools - one for non scheduled - and another for scheduled
|
|
||||||
initializeExecutorServices();
|
|
||||||
|
|
||||||
if (configuration.getJournalType() == JournalType.ASYNCIO) {
|
if (configuration.getJournalType() == JournalType.ASYNCIO) {
|
||||||
if (!AIOSequentialFileFactory.isSupported()) {
|
if (!AIOSequentialFileFactory.isSupported()) {
|
||||||
ActiveMQServerLogger.LOGGER.switchingNIO();
|
ActiveMQServerLogger.LOGGER.switchingNIO();
|
||||||
|
|
Loading…
Reference in New Issue