From f8278ec99c45f5ff58a11f6678bd2333caa3e01a Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 8 Sep 2016 20:46:21 -0400 Subject: [PATCH] ARTEMIS-727 Improving Thread usage on JDBC https://issues.apache.org/jira/browse/ARTEMIS-727 --- artemis-commons/pom.xml | 18 ++++ .../server/ActiveMQScheduledComponent.java | 44 ++++++++- .../artemis/utils}/ThreadLeakCheckRule.java | 92 +++++++++++++++++-- artemis-jdbc-store/pom.xml | 7 ++ .../jdbc/store/journal/JDBCJournalImpl.java | 33 ++++--- .../jdbc/store/journal/JDBCJournalSync.java | 16 +++- .../file/JDBCSequentialFileFactoryTest.java | 15 +++ .../JDBCJournalLoaderCallbackTest.java | 16 ++++ .../journal/impl/SimpleWaitIOCallback.java | 3 + artemis-server/pom.xml | 9 ++ .../core/paging/impl/PageSyncTimer.java | 15 ++- .../core/paging/impl/PagingStoreImpl.java | 2 +- .../AbstractJournalStorageManager.java | 12 ++- .../journal/JDBCJournalStorageManager.java | 20 ++-- .../impl/journal/JournalStorageManager.java | 28 +++++- .../core/server/files/FileStoreMonitor.java | 8 +- .../core/server/impl/ActiveMQServerImpl.java | 8 +- .../core/server/reload/ReloadManagerImpl.java | 6 +- .../core/reload/ReloadManagerTest.java | 8 +- .../server/files/FileMoveManagerTest.java | 2 +- .../server/files/FileStoreMonitorTest.java | 9 +- .../artemis/tests/util/ActiveMQTestBase.java | 80 +++------------- tests/activemq5-unit-tests/pom.xml | 7 ++ tests/extra-tests/pom.xml | 7 ++ tests/integration-tests/pom.xml | 7 ++ .../broadcast/JGroupsBroadcastTest.java | 2 +- .../jdbc/store/journal/JDBCJournalTest.java | 37 ++++++-- .../journal/NIOJournalCompactTest.java | 2 +- .../DeleteMessagesOnStartupTest.java | 2 +- .../persistence/RestartSMTest.java | 2 +- .../persistence/StorageManagerTestBase.java | 12 ++- .../replication/ReplicationTest.java | 2 +- tests/unit-tests/pom.xml | 7 ++ .../impl/DuplicateDetectionUnitTest.java | 6 +- 34 files changed, 398 insertions(+), 146 deletions(-) rename {artemis-server => artemis-commons}/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java (67%) rename {artemis-server/src/test/java/org/apache/activemq/artemis/tests/util => artemis-commons/src/test/java/org/apache/activemq/artemis/utils}/ThreadLeakCheckRule.java (72%) diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml index 98566e4bfa..da14fe8b65 100644 --- a/artemis-commons/pom.xml +++ b/artemis-commons/pom.xml @@ -67,4 +67,22 @@ + + + + org.apache.maven.plugins + maven-jar-plugin + + + test + + test-jar + + + + + + + + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java similarity index 67% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java index dadf171a70..4d503ebf93 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java @@ -17,9 +17,11 @@ package org.apache.activemq.artemis.core.server; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.jboss.logging.Logger; @@ -30,14 +32,25 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R private final ScheduledExecutorService scheduledExecutorService; private long period; private TimeUnit timeUnit; + private final Executor executor; private ScheduledFuture future; + private final boolean onDemand; + + private final AtomicInteger delayed = new AtomicInteger(0); public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService, + Executor executor, long checkPeriod, - TimeUnit timeUnit) { + TimeUnit timeUnit, + boolean onDemand) { + this.executor = executor; this.scheduledExecutorService = scheduledExecutorService; + if (this.scheduledExecutorService == null) { + throw new NullPointerException("scheduled Executor is null"); + } this.period = checkPeriod; this.timeUnit = timeUnit; + this.onDemand = onDemand; } @Override @@ -45,14 +58,30 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R if (future != null) { return; } + if (onDemand) { + return; + } if (period >= 0) { - future = scheduledExecutorService.scheduleWithFixedDelay(this, period, period, timeUnit); + future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit); } else { logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period); } } + public void delay() { + int value = delayed.incrementAndGet(); + if (value > 10) { + delayed.decrementAndGet(); + } + else { + // We only schedule up to 10 periods upfront. + // this is to avoid a window where a current one would be running and a next one is coming. + // in theory just 2 would be enough. I'm using 10 as a precaution here. + scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit); + } + } + public long getPeriod() { return period; } @@ -84,6 +113,10 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R } + public void run() { + delayed.decrementAndGet(); + } + @Override public synchronized boolean isStarted() { return future != null; @@ -98,4 +131,11 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R } } + final Runnable runForScheduler = new Runnable() { + @Override + public void run() { + executor.execute(ActiveMQScheduledComponent.this); + } + }; + } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java similarity index 72% rename from artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java rename to artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java index f7236e57d9..b2c3bf6a07 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java @@ -15,12 +15,18 @@ * limitations under the License. */ -package org.apache.activemq.artemis.tests.util; +package org.apache.activemq.artemis.utils; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.rules.ExternalResource; @@ -28,6 +34,8 @@ import org.junit.rules.ExternalResource; * This is useful to make sure you won't have leaking threads between tests */ public class ThreadLeakCheckRule extends ExternalResource { + private static Logger log = Logger.getLogger(ThreadLeakCheckRule.class); + private static Set knownThreads = new HashSet<>(); boolean enabled = true; @@ -68,7 +76,7 @@ public class ThreadLeakCheckRule extends ExternalResource { if (failed) { failedOnce = true; - ActiveMQTestBase.forceGC(); + forceGC(); try { Thread.sleep(500); } @@ -97,6 +105,61 @@ public class ThreadLeakCheckRule extends ExternalResource { } + private static int failedGCCalls = 0; + + public static void forceGC() { + + if (failedGCCalls >= 10) { + log.info("ignoring forceGC call since it seems System.gc is not working anyways"); + return; + } + log.info("#test forceGC"); + CountDownLatch finalized = new CountDownLatch(1); + WeakReference dumbReference = new WeakReference<>(new DumbReference(finalized)); + + long timeout = System.currentTimeMillis() + 1000; + + // A loop that will wait GC, using the minimal time as possible + while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) { + System.gc(); + System.runFinalization(); + try { + finalized.await(100, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + } + } + + if (dumbReference.get() != null) { + failedGCCalls++; + log.info("It seems that GC is disabled at your VM"); + } + else { + // a success would reset the count + failedGCCalls = 0; + } + log.info("#test forceGC Done "); + } + + public static void forceGC(final Reference ref, final long timeout) { + long waitUntil = System.currentTimeMillis() + timeout; + // A loop that will wait GC, using the minimal time as possible + while (ref.get() != null && System.currentTimeMillis() < waitUntil) { + ArrayList list = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + list.add("Some string with garbage with concatenation " + i); + } + list.clear(); + list = null; + System.gc(); + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + } + } + } + public static void removeKownThread(String name) { knownThreads.remove(name); } @@ -181,18 +244,10 @@ public class ThreadLeakCheckRule extends ExternalResource { //another netty thread return true; } - else if (threadName.contains("derby")) { - // The derby engine is initialized once, and lasts the lifetime of the VM - return true; - } else if (threadName.contains("Abandoned connection cleanup thread")) { // MySQL Engine checks for abandoned connections return true; } - else if (threadName.contains("Timer")) { - // The timer threads in Derby and JDBC use daemon and shutdown once user threads exit. - return true; - } else if (threadName.contains("hawtdispatch")) { // Static workers used by MQTT client. return true; @@ -213,4 +268,21 @@ public class ThreadLeakCheckRule extends ExternalResource { return false; } } + + + protected static class DumbReference { + + private CountDownLatch finalized; + + public DumbReference(CountDownLatch finalized) { + this.finalized = finalized; + } + + @Override + public void finalize() throws Throwable { + finalized.countDown(); + super.finalize(); + } + } + } diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml index a888d4e51c..2e0c8c0416 100644 --- a/artemis-jdbc-store/pom.xml +++ b/artemis-jdbc-store/pom.xml @@ -40,6 +40,13 @@ provided true + + org.apache.activemq + artemis-commons + ${project.version} + test + test-jar + org.jboss.logging diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index fe5e69d4b1..51f3a3e25d 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -23,8 +23,10 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -66,11 +68,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private boolean started; - private Timer syncTimer; + private JDBCJournalSync syncTimer; + + private final Executor completeExecutor; private final Object journalLock = new Object(); - private final String timerThread; + private final ScheduledExecutorService scheduledExecutorService; // Track Tx Records private Map transactions = new ConcurrentHashMap<>(); @@ -78,17 +82,17 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // Sequence ID for journal records private AtomicLong seq = new AtomicLong(0); - public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) { + public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) { super(tableName, jdbcUrl, jdbcDriverClass); - timerThread = "Timer JDBC Journal(" + tableName + ")"; records = new ArrayList<>(); + this.scheduledExecutorService = scheduledExecutorService; + this.completeExecutor = completeExecutor; } @Override public void start() throws Exception { super.start(); - syncTimer = new Timer(timerThread, true); - syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY); + syncTimer = new JDBCJournalSync(scheduledExecutorService, completeExecutor, SYNC_DELAY, TimeUnit.MILLISECONDS, this); started = true; } @@ -111,7 +115,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public synchronized void stop() throws SQLException { if (started) { synchronized (journalLock) { - syncTimer.cancel(); sync(); started = false; super.stop(); @@ -129,9 +132,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { if (!started) return 0; - List recordRef = new ArrayList<>(); + List recordRef; synchronized (records) { - recordRef.addAll(records); + if (records.isEmpty()) { + return 0; + } + recordRef = new ArrayList<>(records); records.clear(); } @@ -271,14 +277,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } } }; - Thread t = new Thread(r); - t.start(); + completeExecutor.execute(r); } private void appendRecord(JDBCJournalRecord record) throws Exception { SimpleWaitIOCallback callback = null; - if (record.isSync() && record.getIoCompletion() == null) { + if (record.isSync() && record.getIoCompletion() == null && !record.isTransactional()) { callback = new SimpleWaitIOCallback(); record.setIoCompletion(callback); } @@ -293,6 +298,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } } + syncTimer.delay(); + if (callback != null) callback.waitCompletion(); } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java index a22462546c..53f07b8eb7 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java @@ -17,18 +17,28 @@ package org.apache.activemq.artemis.jdbc.store.journal; -import java.util.TimerTask; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; -public class JDBCJournalSync extends TimerTask { +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; + +public class JDBCJournalSync extends ActiveMQScheduledComponent { private final JDBCJournalImpl journal; - public JDBCJournalSync(JDBCJournalImpl journal) { + public JDBCJournalSync(ScheduledExecutorService scheduledExecutorService, + Executor executor, + long checkPeriod, + TimeUnit timeUnit, + JDBCJournalImpl journal) { + super(scheduledExecutorService, executor, checkPeriod, timeUnit, true); this.journal = journal; } @Override public void run() { + super.run(); if (journal.isStarted()) { journal.sync(); } diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java index 2bdd729b22..8157e6f9f4 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jdbc.file; import java.nio.ByteBuffer; +import java.sql.DriverManager; import java.sql.SQLException; import java.util.HashSet; import java.util.List; @@ -34,9 +35,11 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.derby.jdbc.EmbeddedDriver; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -46,6 +49,9 @@ import static org.junit.Assert.fail; public class JDBCSequentialFileFactoryTest { + @Rule + public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule(); + private static String connectionUrl = "jdbc:derby:target/data;create=true"; private static String tableName = "FILES"; @@ -67,6 +73,15 @@ public class JDBCSequentialFileFactoryTest { factory.destroy(); } + @After + public void shutdownDerby() { + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } + catch (Exception ignored) { + } + } + @Test public void testJDBCFileFactoryStarted() throws Exception { assertTrue(factory.isStarted()); diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java index 9369866325..c7e514b233 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java @@ -16,11 +16,15 @@ */ package org.apache.activemq.artemis.jdbc.store.journal; +import java.sql.DriverManager; import java.util.ArrayList; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; +import org.junit.After; +import org.junit.Rule; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -28,6 +32,8 @@ import static org.junit.Assert.assertTrue; public class JDBCJournalLoaderCallbackTest { + @Rule + public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); @Test public void testAddDeleteRecord() throws Exception { @@ -46,4 +52,14 @@ public class JDBCJournalLoaderCallbackTest { cb.deleteRecord(record.id); assertTrue(committedRecords.isEmpty()); } + + @After + public void shutdownDerby() { + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } + catch (Exception ignored) { + } + } + } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java index 7f98ec5a92..8c414550cf 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SimpleWaitIOCallback.java @@ -31,6 +31,9 @@ public final class SimpleWaitIOCallback extends SyncIOCompletion { private volatile int errorCode = 0; + public SimpleWaitIOCallback() { + } + @Override public String toString() { return SimpleWaitIOCallback.class.getName(); diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml index 034391d861..7f9b9db77f 100644 --- a/artemis-server/pom.xml +++ b/artemis-server/pom.xml @@ -110,6 +110,15 @@ + + + org.apache.activemq + artemis-commons + ${project.version} + test + test-jar + + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java index bf90750b38..37cffb5966 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java @@ -18,17 +18,19 @@ package org.apache.activemq.artemis.core.paging.impl; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; /** * This will batch multiple calls waiting to perform a sync in a single call. */ -final class PageSyncTimer { +final class PageSyncTimer extends ActiveMQScheduledComponent { // Constants ----------------------------------------------------- @@ -55,7 +57,8 @@ final class PageSyncTimer { // Constructors -------------------------------------------------- - PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, long timeSync) { + PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) { + super(scheduledExecutor, executor, timeSync, TimeUnit.NANOSECONDS, true); this.store = store; this.scheduledExecutor = scheduledExecutor; this.timeSync = timeSync; @@ -68,12 +71,16 @@ final class PageSyncTimer { if (!pendingSync) { pendingSync = true; - // this is a single event - scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS); + delay(); } syncOperations.add(ctx); } + public void run() { + super.run(); + tick(); + } + private void tick() { OperationContext[] pendingSyncsArray; synchronized (this) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 356ea4539a..df603be7a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -170,7 +170,7 @@ public class PagingStoreImpl implements PagingStore { this.syncNonTransactional = syncNonTransactional; if (scheduledExecutor != null && syncTimeout > 0) { - this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout); + this.syncTimer = new PageSyncTimer(this, scheduledExecutor, executor, syncTimeout); } else { this.syncTimer = null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 8fac43893d..6b75e74118 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -145,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager { protected BatchingIDGenerator idGenerator; + protected final ScheduledExecutorService scheduledExecutorService; + protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true); protected Journal messageJournal; @@ -156,7 +159,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { /** * Used to create Operation Contexts */ - private final ExecutorFactory executorFactory; + protected final ExecutorFactory executorFactory; final Executor executor; @@ -181,17 +184,20 @@ public abstract class AbstractJournalStorageManager implements StorageManager { protected final Set largeMessagesToDelete = new HashSet<>(); - public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) { - this(config, executorFactory, null); + public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) { + this(config, executorFactory, scheduledExecutorService, null); } public AbstractJournalStorageManager(Configuration config, ExecutorFactory executorFactory, + ScheduledExecutorService scheduledExecutorService, IOCriticalErrorListener criticalErrorListener) { this.executorFactory = executorFactory; this.ioCriticalErrorListener = criticalErrorListener; + this.scheduledExecutorService = scheduledExecutorService; + this.config = config; executor = executorFactory.getExecutor(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 27fe5dcc91..70d824f204 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.config.Configuration; @@ -32,14 +33,17 @@ import org.apache.activemq.artemis.utils.ExecutorFactory; public class JDBCJournalStorageManager extends JournalStorageManager { - public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory) { - super(config, executorFactory); + public JDBCJournalStorageManager(Configuration config, + ExecutorFactory executorFactory, + ScheduledExecutorService scheduledExecutorService) { + super(config, executorFactory, scheduledExecutorService); } public JDBCJournalStorageManager(final Configuration config, - final ExecutorFactory executorFactory, - final IOCriticalErrorListener criticalErrorListener) { - super(config, executorFactory, criticalErrorListener); + final ScheduledExecutorService scheduledExecutorService, + final ExecutorFactory executorFactory, + final IOCriticalErrorListener criticalErrorListener) { + super(config, executorFactory, scheduledExecutorService, criticalErrorListener); } @Override @@ -47,16 +51,16 @@ public class JDBCJournalStorageManager extends JournalStorageManager { try { DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); - Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName()); + Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor()); bindingsJournal = localBindings; - Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName()); + Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor()); messageJournal = localMessage; bindingsJournal.start(); messageJournal.start(); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executor); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executorFactory.getExecutor()); largeMessagesFactory.start(); } catch (Exception e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index d8c28d24ca..2aefbefc39 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -81,14 +82,25 @@ public class JournalStorageManager extends AbstractJournalStorageManager { private ReplicationManager replicator; + public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) { + this(config, executorFactory, scheduledExecutorService, null); + } + public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) { - this(config, executorFactory, null); + this(config, executorFactory, null, null); + } + + public JournalStorageManager(final Configuration config, + final ExecutorFactory executorFactory, + final ScheduledExecutorService scheduledExecutorService, + final IOCriticalErrorListener criticalErrorListener) { + super(config, executorFactory, scheduledExecutorService, criticalErrorListener); } public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final IOCriticalErrorListener criticalErrorListener) { - super(config, executorFactory, criticalErrorListener); + super(config, executorFactory, null, criticalErrorListener); } @Override @@ -732,8 +744,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager { @Override public void injectMonitor(FileStoreMonitor monitor) throws Exception { - monitor.addStore(journalFF.getDirectory()); - monitor.addStore(largeMessagesFactory.getDirectory()); - monitor.addStore(bindingsFF.getDirectory()); + if (journalFF != null) { + monitor.addStore(journalFF.getDirectory()); + } + if (largeMessagesFactory != null) { + monitor.addStore(largeMessagesFactory.getDirectory()); + } + if (bindingsFF != null) { + monitor.addStore(bindingsFF.getDirectory()); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java index f75f6c6cdd..a5259bd56c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java @@ -23,6 +23,7 @@ import java.nio.file.FileStore; import java.nio.file.Files; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -44,10 +45,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { private double maxUsage; public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService, + Executor executor, long checkPeriod, TimeUnit timeUnit, double maxUsage) { - super(scheduledExecutorService, checkPeriod, timeUnit); + super(scheduledExecutorService, executor, checkPeriod, timeUnit, false); this.maxUsage = maxUsage; } @@ -57,7 +59,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { } public synchronized FileStoreMonitor addStore(File file) throws IOException { - if (file.exists()) { + // JDBC storage may return this as null, and we may need to ignore it + if (file != null && file.exists()) { addStore(Files.getFileStore(file.toPath())); } return this; @@ -70,6 +73,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { public void run() { + super.run(); tick(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 680af8a4dc..f5b9f26c49 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1796,11 +1796,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { private StorageManager createStorageManager() { if (configuration.isPersistenceEnabled()) { if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { - return new JDBCJournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO); + return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, shutdownOnCriticalIO); } // Default to File Based Storage Manager, (Legacy default configuration). else { - return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO); + return new JournalStorageManager(configuration, executorFactory, scheduledPool, shutdownOnCriticalIO); } } return new NullStorageManager(); @@ -1974,7 +1974,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration()); - this.reloadManager = new ReloadManagerImpl(getScheduledPool(), configuration.getConfigurationFileRefreshPeriod()); + this.reloadManager = new ReloadManagerImpl(getScheduledPool(), executorFactory.getExecutor(), configuration.getConfigurationFileRefreshPeriod()); if (configuration.getConfigurationUrl() != null && getScheduledPool() != null) { reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader()); @@ -2055,7 +2055,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } try { - injectMonitor(new FileStoreMonitor(getScheduledPool(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f)); + injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f)); } catch (Exception e) { logger.warn(e.getMessage(), e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java index 7686ac5dfd..43fe54c25f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -37,11 +38,12 @@ public class ReloadManagerImpl extends ActiveMQScheduledComponent implements Rel private Map registry = new HashMap<>(); - public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, long checkPeriod) { - super(scheduledExecutorService, checkPeriod, TimeUnit.MILLISECONDS); + public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod) { + super(scheduledExecutorService, executor, checkPeriod, TimeUnit.MILLISECONDS, false); } public void run() { + super.run(); tick(); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java index 181604f73e..e75ebc85ff 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/reload/ReloadManagerTest.java @@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.reload; import java.io.File; import java.io.IOException; import java.net.URL; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -37,18 +39,22 @@ public class ReloadManagerTest extends ActiveMQTestBase { private ScheduledExecutorService scheduledExecutorService; + private ExecutorService executorService; + private ReloadManagerImpl manager; @Before public void startScheduled() { scheduledExecutorService = new ScheduledThreadPoolExecutor(5); - manager = new ReloadManagerImpl(scheduledExecutorService, 100); + executorService = Executors.newSingleThreadExecutor(); + manager = new ReloadManagerImpl(scheduledExecutorService, executorService, 100); } @After public void stopScheduled() { manager.stop(); scheduledExecutorService.shutdown(); + executorService.shutdown(); scheduledExecutorService = null; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java index 299f0bc238..b00efcab49 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java @@ -39,8 +39,8 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; -import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java index 9a47d05f50..7b5629fd68 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java @@ -23,6 +23,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.nio.file.FileStore; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,16 +41,19 @@ import org.junit.Test; public class FileStoreMonitorTest extends ActiveMQTestBase { private ScheduledExecutorService scheduledExecutorService; + private ExecutorService executorService; @Before public void startScheduled() { scheduledExecutorService = new ScheduledThreadPoolExecutor(5); + executorService = Executors.newSingleThreadExecutor(); } @After public void stopScheduled() { scheduledExecutorService.shutdown(); scheduledExecutorService = null; + executorService.shutdown(); } @Test @@ -91,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { }; final AtomicBoolean fakeReturn = new AtomicBoolean(false); - FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 100, TimeUnit.MILLISECONDS, 0.999) { + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) { @Override protected double calculateUsage(FileStore store) throws IOException { if (fakeReturn.get()) { @@ -123,7 +128,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { @Test public void testScheduler() throws Exception { - FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 20, TimeUnit.MILLISECONDS, 0.9); + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9); final ReusableLatch latch = new ReusableLatch(5); storeMonitor.addStore(getTestDirfile()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index b5fe7b0aeb..9d727a7ffb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -37,11 +37,11 @@ import java.io.OutputStream; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.management.ManagementFactory; -import java.lang.ref.Reference; import java.lang.ref.WeakReference; import java.net.ServerSocket; import java.sql.Connection; import java.sql.Driver; +import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -139,6 +139,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.FileUtil; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; import org.junit.After; @@ -218,6 +219,16 @@ public abstract class ActiveMQTestBase extends Assert { } }; + @After + public void shutdownDerby() { + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } + catch (Exception ignored) { + } + } + + static { Random random = new Random(); DEFAULT_UDP_PORT = 6000 + random.nextInt(1000); @@ -550,60 +561,10 @@ public abstract class ActiveMQTestBase extends Assert { } } - private static int failedGCCalls = 0; - public static void forceGC() { - - if (failedGCCalls >= 10) { - log.info("ignoring forceGC call since it seems System.gc is not working anyways"); - return; - } - log.info("#test forceGC"); - CountDownLatch finalized = new CountDownLatch(1); - WeakReference dumbReference = new WeakReference<>(new DumbReference(finalized)); - - long timeout = System.currentTimeMillis() + 1000; - - // A loop that will wait GC, using the minimal time as possible - while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) { - System.gc(); - System.runFinalization(); - try { - finalized.await(100, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - } - } - - if (dumbReference.get() != null) { - failedGCCalls++; - log.info("It seems that GC is disabled at your VM"); - } - else { - // a success would reset the count - failedGCCalls = 0; - } - log.info("#test forceGC Done "); + ThreadLeakCheckRule.forceGC(); } - public static void forceGC(final Reference ref, final long timeout) { - long waitUntil = System.currentTimeMillis() + timeout; - // A loop that will wait GC, using the minimal time as possible - while (ref.get() != null && System.currentTimeMillis() < waitUntil) { - ArrayList list = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { - list.add("Some string with garbage with concatenation " + i); - } - list.clear(); - list = null; - System.gc(); - try { - Thread.sleep(500); - } - catch (InterruptedException e) { - } - } - } /** * Verifies whether weak references are released after a few GCs. @@ -2514,19 +2475,4 @@ public abstract class ActiveMQTestBase extends Assert { public static void waitForLatch(CountDownLatch latch) throws InterruptedException { assertTrue("Latch has got to return within a minute", latch.await(1, TimeUnit.MINUTES)); } - - protected static class DumbReference { - - private CountDownLatch finalized; - - public DumbReference(CountDownLatch finalized) { - this.finalized = finalized; - } - - @Override - public void finalize() throws Throwable { - finalized.countDown(); - super.finalize(); - } - } } diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml index 2c7d411faa..6cafa8d22d 100644 --- a/tests/activemq5-unit-tests/pom.xml +++ b/tests/activemq5-unit-tests/pom.xml @@ -55,6 +55,13 @@ ${project.version} test-jar + + org.apache.activemq + artemis-commons + ${project.version} + test + test-jar + org.apache.activemq diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml index 415dfc6d29..497b33ad70 100644 --- a/tests/extra-tests/pom.xml +++ b/tests/extra-tests/pom.xml @@ -105,6 +105,13 @@ test test-jar + + org.apache.activemq + artemis-commons + ${project.version} + test + test-jar + org.apache.activemq.tests unit-tests diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 57b9171d54..c6f9834c27 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -53,6 +53,13 @@ test test-jar + + org.apache.activemq + artemis-commons + ${project.version} + test + test-jar + org.apache.activemq.tests unit-tests diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java index 5bf36e9ae6..234a9fb9c5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java @@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpoint; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; -import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.jgroups.JChannel; import org.jgroups.conf.PlainConfigurator; import org.junit.After; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index 405acde9f7..fc3d9ff612 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -16,9 +16,14 @@ */ package org.apache.activemq.artemis.tests.integration.jdbc.store.journal; +import java.sql.DriverManager; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.journal.IOCompletion; @@ -26,7 +31,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; +import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -45,10 +50,32 @@ public class JDBCJournalTest extends ActiveMQTestBase { private String jdbcUrl; + private ScheduledExecutorService scheduledExecutorService; + + private ExecutorService executorService; + + @After + @Override + public void tearDown() throws Exception { + journal.destroy(); + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } + catch (Exception ignored) { + } + scheduledExecutorService.shutdown(); + scheduledExecutorService = null; + executorService.shutdown(); + executorService = null; + + } + @Before public void setup() throws Exception { + scheduledExecutorService = new ScheduledThreadPoolExecutor(5); + executorService = Executors.newSingleThreadExecutor(); jdbcUrl = "jdbc:derby:target/data;create=true"; - journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS); + journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS, scheduledExecutorService, executorService); journal.start(); } @@ -59,7 +86,6 @@ public class JDBCJournalTest extends ActiveMQTestBase { journal.appendAddRecord(i, (byte) 1, new byte[0], true); } - Thread.sleep(3000); assertEquals(noRecords, journal.getNumberOfRecords()); } @@ -122,9 +148,4 @@ public class JDBCJournalTest extends ActiveMQTestBase { assertEquals(noRecords + (noTxRecords * noTx), recordInfos.size()); } - @After - @Override - public void tearDown() throws Exception { - journal.destroy(); - } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index f1b602f457..096e45d9ce 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -1634,7 +1634,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase { final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); - final JournalStorageManager storage = new JournalStorageManager(config, factory, null); + final JournalStorageManager storage = new JournalStorageManager(config, factory); storage.start(); storage.loadInternalOnly(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java index de89e189a2..5e24d55a16 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java @@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase { @Override protected JournalStorageManager createJournalStorageManager(Configuration configuration) { - return new JournalStorageManager(configuration, execFactory, null) { + return new JournalStorageManager(configuration, execFactory) { @Override public void deleteMessage(final long messageID) throws Exception { deletedMessage.add(messageID); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java index 15e96b2279..1c51b36b2b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java @@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase { PostOffice postOffice = new FakePostOffice(); - final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, null); + final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory); try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java index 886cde3ab0..cdf67439be 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; @@ -47,6 +49,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { protected ExecutorFactory execFactory; + protected ScheduledExecutorService scheduledExecutorService; + protected StorageManager journal; protected JMSStorageManager jmsJournal; @@ -73,6 +77,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { super.setUp(); execFactory = getOrderedExecutor(); + + scheduledExecutorService = new ScheduledThreadPoolExecutor(5); } @Override @@ -103,6 +109,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { jmsJournal = null; } + scheduledExecutorService.shutdown(); + destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"})); super.tearDown(); if (exception != null) @@ -132,7 +140,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { * @param configuration */ protected JournalStorageManager createJournalStorageManager(Configuration configuration) { - JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, null); + JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory); addActiveMQComponent(jsm); return jsm; } @@ -141,7 +149,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { * @param configuration */ protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) { - JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, null); + JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, scheduledExecutorService); addActiveMQComponent(jsm); return jsm; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 6ff0cf041e..e05bdb20df 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -440,7 +440,7 @@ public final class ReplicationTest extends ActiveMQTestBase { * @throws Exception */ private JournalStorageManager getStorage() throws Exception { - return new JournalStorageManager(createDefaultInVMConfig(), factory, null); + return new JournalStorageManager(createDefaultInVMConfig(), factory); } /** diff --git a/tests/unit-tests/pom.xml b/tests/unit-tests/pom.xml index ef2d902c88..198231e5b6 100644 --- a/tests/unit-tests/pom.xml +++ b/tests/unit-tests/pom.xml @@ -38,6 +38,13 @@ test test-jar + + org.apache.activemq + artemis-commons + ${project.version} + test + test-jar + org.apache.activemq artemis-server diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java index d99b4f0da8..0b3fe59fae 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java @@ -92,7 +92,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()); - journal = new JournalStorageManager(configuration, factory, null); + journal = new JournalStorageManager(configuration, factory); journal.start(); journal.loadBindingJournal(new ArrayList(), new ArrayList()); @@ -112,7 +112,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { journal.stop(); - journal = new JournalStorageManager(configuration, factory, null); + journal = new JournalStorageManager(configuration, factory); journal.start(); journal.loadBindingJournal(new ArrayList(), new ArrayList()); @@ -135,7 +135,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { mapDups.clear(); - journal = new JournalStorageManager(configuration, factory, null); + journal = new JournalStorageManager(configuration, factory); journal.start(); journal.loadBindingJournal(new ArrayList(), new ArrayList());