From 1af6d986a570f4c0db87cdd141fad3c66b84ea92 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 27 Oct 2017 15:14:28 -0400 Subject: [PATCH] ARTEMIS-1447 Reuse thread pools on JDBC Locks I had moved the initialization of the thread pools for that.. I also have changed the CrticialAnalyzer to use the same pools as part of this --- .../server/ActiveMQScheduledComponent.java | 7 ++ .../utils/critical/CriticalAnalyzerImpl.java | 85 +++++++------------ .../core/server/impl/ActiveMQServerImpl.java | 12 +-- 3 files changed, 47 insertions(+), 57 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java index d9a8938e8f..0ca8255033 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java @@ -188,6 +188,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R return this; } + public synchronized ActiveMQScheduledComponent setPeriod(long period, TimeUnit unit) { + this.period = period; + this.timeUnit = unit; + restartIfNeeded(); + return this; + } + public long getInitialDelay() { return initialDelay; } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java index 55c1854d69..1c2c0ebc02 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java @@ -19,9 +19,10 @@ package org.apache.activemq.artemis.utils.critical; import java.util.ConcurrentModificationException; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.jboss.logging.Logger; @@ -31,7 +32,29 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { private volatile long timeoutNanoSeconds; - private volatile long checkTimeNanoSeconds; + // one minute by default.. the server will change it for sure + private volatile long checkTimeNanoSeconds = TimeUnit.SECONDS.toNanos(60); + + private final ActiveMQScheduledComponent scheduledComponent; + + private final AtomicBoolean running = new AtomicBoolean(false); + + public CriticalAnalyzerImpl() { + // this will make the scheduled component to start its own pool + + /* Important: The scheduled component should have its own thread pool... + * otherwise in case of a deadlock, or a starvation of the server the analyzer won't pick up any + * issues and won't be able to shutdown the server or halt the VM + */ + this.scheduledComponent = new ActiveMQScheduledComponent(null, null, checkTimeNanoSeconds, TimeUnit.NANOSECONDS, false) { + @Override + public void run() { + logger.trace("Checking critical analyzer"); + check(); + } + }; + + } @Override public void clear() { @@ -41,10 +64,6 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { private CopyOnWriteArrayList actions = new CopyOnWriteArrayList<>(); - private Thread thread; - - private final Semaphore running = new Semaphore(1); - private final ConcurrentHashSet components = new ConcurrentHashSet<>(); @Override @@ -65,6 +84,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { @Override public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) { this.checkTimeNanoSeconds = unit.toNanos(timeout); + this.scheduledComponent.setPeriod(timeout, unit); return this; } @@ -78,6 +98,9 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { @Override public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) { + if (checkTimeNanoSeconds <= 0) { + this.setCheckTime(timeout / 2, unit); + } this.timeoutNanoSeconds = unit.toNanos(timeout); return this; } @@ -117,7 +140,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { } private void fireAction(CriticalComponent component) { - for (CriticalAction action: actions) { + for (CriticalAction action : actions) { try { action.run(component); } catch (Throwable e) { @@ -128,59 +151,17 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { @Override public void start() { + scheduledComponent.start(); - if (!running.tryAcquire()) { - // already running - return; - } - - // we are not using any Thread Pool or any Scheduled Executors from the ArtemisServer - // as that would defeat the purpose, - // as in any deadlocks the schedulers may be starving for something not responding fast enough - thread = new Thread("Artemis Critical Analyzer") { - @Override - public void run() { - try { - while (true) { - if (running.tryAcquire(getCheckTimeNanoSeconds(), TimeUnit.NANOSECONDS)) { - running.release(); - // this means that the server has been stopped as we could acquire the semaphore... returning now - break; - } - logger.trace("Checking critical analyzer"); - check(); - } - } catch (InterruptedException interrupted) { - // i will just leave on that case - } - } - }; - - thread.setDaemon(true); - - thread.start(); } @Override public void stop() { - if (!isStarted()) { - // already stopped, leaving - return; - } - - running.release(); - - try { - if (thread != null) { - thread.join(); - } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } + scheduledComponent.stop(); } @Override public boolean isStarted() { - return running.availablePermits() == 0; + return scheduledComponent.isStarted(); } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 7b4adb4866..fff7a57648 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -507,6 +507,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { configuration.parseSystemProperties(); + initializeExecutorServices(); + initializeCriticalAnalyzer(); startDate = new Date(); @@ -575,13 +577,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void initializeCriticalAnalyzer() throws Exception { if (analyzer == null) { if (configuration.isCriticalAnalyzer()) { + // this will have its own ScheduledPool this.analyzer = new CriticalAnalyzerImpl(); } else { this.analyzer = EmptyCriticalAnalyzer.getInstance(); } } - /** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/ + /* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/ this.analyzer.clear(); this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS); @@ -1181,9 +1184,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { } try { - this.getCriticalAnalyzer().stop(); + this.analyzer.stop(); } catch (Exception e) { logger.warn(e.getMessage(), e); + } finally { + this.analyzer = null; } if (identity != null) { @@ -2257,9 +2262,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (state == SERVER_STATE.STOPPED) return false; - // Create the pools - we have two pools - one for non scheduled - and another for scheduled - initializeExecutorServices(); - if (configuration.getJournalType() == JournalType.ASYNCIO) { if (!AIOSequentialFileFactory.isSupported()) { ActiveMQServerLogger.LOGGER.switchingNIO();