ARTEMIS-727 Improving Thread usage on JDBC

https://issues.apache.org/jira/browse/ARTEMIS-727
This commit is contained in:
Clebert Suconic 2016-09-08 20:46:21 -04:00
parent 1a9c29c05b
commit f8278ec99c
34 changed files with 398 additions and 146 deletions

View File

@ -67,4 +67,22 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<phase>test</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -17,9 +17,11 @@
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -30,14 +32,25 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private long period; private long period;
private TimeUnit timeUnit; private TimeUnit timeUnit;
private final Executor executor;
private ScheduledFuture future; private ScheduledFuture future;
private final boolean onDemand;
private final AtomicInteger delayed = new AtomicInteger(0);
public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService, public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod, long checkPeriod,
TimeUnit timeUnit) { TimeUnit timeUnit,
boolean onDemand) {
this.executor = executor;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
if (this.scheduledExecutorService == null) {
throw new NullPointerException("scheduled Executor is null");
}
this.period = checkPeriod; this.period = checkPeriod;
this.timeUnit = timeUnit; this.timeUnit = timeUnit;
this.onDemand = onDemand;
} }
@Override @Override
@ -45,14 +58,30 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
if (future != null) { if (future != null) {
return; return;
} }
if (onDemand) {
return;
}
if (period >= 0) { if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(this, period, period, timeUnit); future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit);
} }
else { else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period); logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
} }
} }
public void delay() {
int value = delayed.incrementAndGet();
if (value > 10) {
delayed.decrementAndGet();
}
else {
// We only schedule up to 10 periods upfront.
// this is to avoid a window where a current one would be running and a next one is coming.
// in theory just 2 would be enough. I'm using 10 as a precaution here.
scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
}
}
public long getPeriod() { public long getPeriod() {
return period; return period;
} }
@ -84,6 +113,10 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
} }
public void run() {
delayed.decrementAndGet();
}
@Override @Override
public synchronized boolean isStarted() { public synchronized boolean isStarted() {
return future != null; return future != null;
@ -98,4 +131,11 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
} }
} }
final Runnable runForScheduler = new Runnable() {
@Override
public void run() {
executor.execute(ActiveMQScheduledComponent.this);
}
};
} }

View File

@ -15,12 +15,18 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.tests.util; package org.apache.activemq.artemis.utils;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.rules.ExternalResource; import org.junit.rules.ExternalResource;
@ -28,6 +34,8 @@ import org.junit.rules.ExternalResource;
* This is useful to make sure you won't have leaking threads between tests * This is useful to make sure you won't have leaking threads between tests
*/ */
public class ThreadLeakCheckRule extends ExternalResource { public class ThreadLeakCheckRule extends ExternalResource {
private static Logger log = Logger.getLogger(ThreadLeakCheckRule.class);
private static Set<String> knownThreads = new HashSet<>(); private static Set<String> knownThreads = new HashSet<>();
boolean enabled = true; boolean enabled = true;
@ -68,7 +76,7 @@ public class ThreadLeakCheckRule extends ExternalResource {
if (failed) { if (failed) {
failedOnce = true; failedOnce = true;
ActiveMQTestBase.forceGC(); forceGC();
try { try {
Thread.sleep(500); Thread.sleep(500);
} }
@ -97,6 +105,61 @@ public class ThreadLeakCheckRule extends ExternalResource {
} }
private static int failedGCCalls = 0;
public static void forceGC() {
if (failedGCCalls >= 10) {
log.info("ignoring forceGC call since it seems System.gc is not working anyways");
return;
}
log.info("#test forceGC");
CountDownLatch finalized = new CountDownLatch(1);
WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
long timeout = System.currentTimeMillis() + 1000;
// A loop that will wait GC, using the minimal time as possible
while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
System.gc();
System.runFinalization();
try {
finalized.await(100, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
}
}
if (dumbReference.get() != null) {
failedGCCalls++;
log.info("It seems that GC is disabled at your VM");
}
else {
// a success would reset the count
failedGCCalls = 0;
}
log.info("#test forceGC Done ");
}
public static void forceGC(final Reference<?> ref, final long timeout) {
long waitUntil = System.currentTimeMillis() + timeout;
// A loop that will wait GC, using the minimal time as possible
while (ref.get() != null && System.currentTimeMillis() < waitUntil) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add("Some string with garbage with concatenation " + i);
}
list.clear();
list = null;
System.gc();
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
}
}
}
public static void removeKownThread(String name) { public static void removeKownThread(String name) {
knownThreads.remove(name); knownThreads.remove(name);
} }
@ -181,18 +244,10 @@ public class ThreadLeakCheckRule extends ExternalResource {
//another netty thread //another netty thread
return true; return true;
} }
else if (threadName.contains("derby")) {
// The derby engine is initialized once, and lasts the lifetime of the VM
return true;
}
else if (threadName.contains("Abandoned connection cleanup thread")) { else if (threadName.contains("Abandoned connection cleanup thread")) {
// MySQL Engine checks for abandoned connections // MySQL Engine checks for abandoned connections
return true; return true;
} }
else if (threadName.contains("Timer")) {
// The timer threads in Derby and JDBC use daemon and shutdown once user threads exit.
return true;
}
else if (threadName.contains("hawtdispatch")) { else if (threadName.contains("hawtdispatch")) {
// Static workers used by MQTT client. // Static workers used by MQTT client.
return true; return true;
@ -213,4 +268,21 @@ public class ThreadLeakCheckRule extends ExternalResource {
return false; return false;
} }
} }
protected static class DumbReference {
private CountDownLatch finalized;
public DumbReference(CountDownLatch finalized) {
this.finalized = finalized;
}
@Override
public void finalize() throws Throwable {
finalized.countDown();
super.finalize();
}
}
} }

View File

@ -40,6 +40,13 @@
<scope>provided</scope> <scope>provided</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.jboss.logging</groupId> <groupId>org.jboss.logging</groupId>

View File

@ -23,8 +23,10 @@ import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@ -66,11 +68,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private boolean started; private boolean started;
private Timer syncTimer; private JDBCJournalSync syncTimer;
private final Executor completeExecutor;
private final Object journalLock = new Object(); private final Object journalLock = new Object();
private final String timerThread; private final ScheduledExecutorService scheduledExecutorService;
// Track Tx Records // Track Tx Records
private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>(); private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
@ -78,17 +82,17 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
// Sequence ID for journal records // Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0); private AtomicLong seq = new AtomicLong(0);
public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) { public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
super(tableName, jdbcUrl, jdbcDriverClass); super(tableName, jdbcUrl, jdbcDriverClass);
timerThread = "Timer JDBC Journal(" + tableName + ")";
records = new ArrayList<>(); records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;
} }
@Override @Override
public void start() throws Exception { public void start() throws Exception {
super.start(); super.start();
syncTimer = new Timer(timerThread, true); syncTimer = new JDBCJournalSync(scheduledExecutorService, completeExecutor, SYNC_DELAY, TimeUnit.MILLISECONDS, this);
syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
started = true; started = true;
} }
@ -111,7 +115,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
public synchronized void stop() throws SQLException { public synchronized void stop() throws SQLException {
if (started) { if (started) {
synchronized (journalLock) { synchronized (journalLock) {
syncTimer.cancel();
sync(); sync();
started = false; started = false;
super.stop(); super.stop();
@ -129,9 +132,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
if (!started) if (!started)
return 0; return 0;
List<JDBCJournalRecord> recordRef = new ArrayList<>(); List<JDBCJournalRecord> recordRef;
synchronized (records) { synchronized (records) {
recordRef.addAll(records); if (records.isEmpty()) {
return 0;
}
recordRef = new ArrayList<>(records);
records.clear(); records.clear();
} }
@ -271,14 +277,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
} }
}; };
Thread t = new Thread(r); completeExecutor.execute(r);
t.start();
} }
private void appendRecord(JDBCJournalRecord record) throws Exception { private void appendRecord(JDBCJournalRecord record) throws Exception {
SimpleWaitIOCallback callback = null; SimpleWaitIOCallback callback = null;
if (record.isSync() && record.getIoCompletion() == null) { if (record.isSync() && record.getIoCompletion() == null && !record.isTransactional()) {
callback = new SimpleWaitIOCallback(); callback = new SimpleWaitIOCallback();
record.setIoCompletion(callback); record.setIoCompletion(callback);
} }
@ -293,6 +298,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
} }
syncTimer.delay();
if (callback != null) if (callback != null)
callback.waitCompletion(); callback.waitCompletion();
} }

View File

@ -17,18 +17,28 @@
package org.apache.activemq.artemis.jdbc.store.journal; package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.TimerTask; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class JDBCJournalSync extends TimerTask { import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
public class JDBCJournalSync extends ActiveMQScheduledComponent {
private final JDBCJournalImpl journal; private final JDBCJournalImpl journal;
public JDBCJournalSync(JDBCJournalImpl journal) { public JDBCJournalSync(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod,
TimeUnit timeUnit,
JDBCJournalImpl journal) {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, true);
this.journal = journal; this.journal = journal;
} }
@Override @Override
public void run() { public void run() {
super.run();
if (journal.isStarted()) { if (journal.isStarted()) {
journal.sync(); journal.sync();
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.file; package org.apache.activemq.artemis.jdbc.file;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -34,9 +35,11 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.derby.jdbc.EmbeddedDriver; import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
@ -46,6 +49,9 @@ import static org.junit.Assert.fail;
public class JDBCSequentialFileFactoryTest { public class JDBCSequentialFileFactoryTest {
@Rule
public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
private static String connectionUrl = "jdbc:derby:target/data;create=true"; private static String connectionUrl = "jdbc:derby:target/data;create=true";
private static String tableName = "FILES"; private static String tableName = "FILES";
@ -67,6 +73,15 @@ public class JDBCSequentialFileFactoryTest {
factory.destroy(); factory.destroy();
} }
@After
public void shutdownDerby() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception ignored) {
}
}
@Test @Test
public void testJDBCFileFactoryStarted() throws Exception { public void testJDBCFileFactoryStarted() throws Exception {
assertTrue(factory.isStarted()); assertTrue(factory.isStarted());

View File

@ -16,11 +16,15 @@
*/ */
package org.apache.activemq.artemis.jdbc.store.journal; package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.DriverManager;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -28,6 +32,8 @@ import static org.junit.Assert.assertTrue;
public class JDBCJournalLoaderCallbackTest { public class JDBCJournalLoaderCallbackTest {
@Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
@Test @Test
public void testAddDeleteRecord() throws Exception { public void testAddDeleteRecord() throws Exception {
@ -46,4 +52,14 @@ public class JDBCJournalLoaderCallbackTest {
cb.deleteRecord(record.id); cb.deleteRecord(record.id);
assertTrue(committedRecords.isEmpty()); assertTrue(committedRecords.isEmpty());
} }
@After
public void shutdownDerby() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception ignored) {
}
}
} }

View File

@ -31,6 +31,9 @@ public final class SimpleWaitIOCallback extends SyncIOCompletion {
private volatile int errorCode = 0; private volatile int errorCode = 0;
public SimpleWaitIOCallback() {
}
@Override @Override
public String toString() { public String toString() {
return SimpleWaitIOCallback.class.getName(); return SimpleWaitIOCallback.class.getName();

View File

@ -110,6 +110,15 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -18,17 +18,19 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
/** /**
* This will batch multiple calls waiting to perform a sync in a single call. * This will batch multiple calls waiting to perform a sync in a single call.
*/ */
final class PageSyncTimer { final class PageSyncTimer extends ActiveMQScheduledComponent {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
@ -55,7 +57,8 @@ final class PageSyncTimer {
// Constructors -------------------------------------------------- // Constructors --------------------------------------------------
PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, long timeSync) { PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, Executor executor, long timeSync) {
super(scheduledExecutor, executor, timeSync, TimeUnit.NANOSECONDS, true);
this.store = store; this.store = store;
this.scheduledExecutor = scheduledExecutor; this.scheduledExecutor = scheduledExecutor;
this.timeSync = timeSync; this.timeSync = timeSync;
@ -68,12 +71,16 @@ final class PageSyncTimer {
if (!pendingSync) { if (!pendingSync) {
pendingSync = true; pendingSync = true;
// this is a single event delay();
scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
} }
syncOperations.add(ctx); syncOperations.add(ctx);
} }
public void run() {
super.run();
tick();
}
private void tick() { private void tick() {
OperationContext[] pendingSyncsArray; OperationContext[] pendingSyncsArray;
synchronized (this) { synchronized (this) {

View File

@ -170,7 +170,7 @@ public class PagingStoreImpl implements PagingStore {
this.syncNonTransactional = syncNonTransactional; this.syncNonTransactional = syncNonTransactional;
if (scheduledExecutor != null && syncTimeout > 0) { if (scheduledExecutor != null && syncTimeout > 0) {
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout); this.syncTimer = new PageSyncTimer(this, scheduledExecutor, executor, syncTimeout);
} }
else { else {
this.syncTimer = null; this.syncTimer = null;

View File

@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -145,6 +146,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
protected BatchingIDGenerator idGenerator; protected BatchingIDGenerator idGenerator;
protected final ScheduledExecutorService scheduledExecutorService;
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true); protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
protected Journal messageJournal; protected Journal messageJournal;
@ -156,7 +159,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
/** /**
* Used to create Operation Contexts * Used to create Operation Contexts
*/ */
private final ExecutorFactory executorFactory; protected final ExecutorFactory executorFactory;
final Executor executor; final Executor executor;
@ -181,17 +184,20 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
protected final Set<Long> largeMessagesToDelete = new HashSet<>(); protected final Set<Long> largeMessagesToDelete = new HashSet<>();
public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) { public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) {
this(config, executorFactory, null); this(config, executorFactory, scheduledExecutorService, null);
} }
public AbstractJournalStorageManager(Configuration config, public AbstractJournalStorageManager(Configuration config,
ExecutorFactory executorFactory, ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
IOCriticalErrorListener criticalErrorListener) { IOCriticalErrorListener criticalErrorListener) {
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.ioCriticalErrorListener = criticalErrorListener; this.ioCriticalErrorListener = criticalErrorListener;
this.scheduledExecutorService = scheduledExecutorService;
this.config = config; this.config = config;
executor = executorFactory.getExecutor(); executor = executorFactory.getExecutor();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
@ -32,14 +33,17 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager { public class JDBCJournalStorageManager extends JournalStorageManager {
public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory) { public JDBCJournalStorageManager(Configuration config,
super(config, executorFactory); ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService) {
super(config, executorFactory, scheduledExecutorService);
} }
public JDBCJournalStorageManager(final Configuration config, public JDBCJournalStorageManager(final Configuration config,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory executorFactory, final ExecutorFactory executorFactory,
final IOCriticalErrorListener criticalErrorListener) { final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, criticalErrorListener); super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
} }
@Override @Override
@ -47,16 +51,16 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
try { try {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName()); Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
bindingsJournal = localBindings; bindingsJournal = localBindings;
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName()); Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = localMessage; messageJournal = localMessage;
bindingsJournal.start(); bindingsJournal.start();
messageJournal.start(); messageJournal.start();
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executor); largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executorFactory.getExecutor());
largeMessagesFactory.start(); largeMessagesFactory.start();
} }
catch (Exception e) { catch (Exception e) {

View File

@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -81,14 +82,25 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
private ReplicationManager replicator; private ReplicationManager replicator;
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) {
this(config, executorFactory, scheduledExecutorService, null);
}
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) { public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) {
this(config, executorFactory, null); this(config, executorFactory, null, null);
}
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutorService,
final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, scheduledExecutorService, criticalErrorListener);
} }
public JournalStorageManager(final Configuration config, public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory, final ExecutorFactory executorFactory,
final IOCriticalErrorListener criticalErrorListener) { final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, criticalErrorListener); super(config, executorFactory, null, criticalErrorListener);
} }
@Override @Override
@ -732,8 +744,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override @Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception { public void injectMonitor(FileStoreMonitor monitor) throws Exception {
if (journalFF != null) {
monitor.addStore(journalFF.getDirectory()); monitor.addStore(journalFF.getDirectory());
}
if (largeMessagesFactory != null) {
monitor.addStore(largeMessagesFactory.getDirectory()); monitor.addStore(largeMessagesFactory.getDirectory());
}
if (bindingsFF != null) {
monitor.addStore(bindingsFF.getDirectory()); monitor.addStore(bindingsFF.getDirectory());
} }
}
} }

View File

@ -23,6 +23,7 @@ import java.nio.file.FileStore;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -44,10 +45,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
private double maxUsage; private double maxUsage;
public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService, public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod, long checkPeriod,
TimeUnit timeUnit, TimeUnit timeUnit,
double maxUsage) { double maxUsage) {
super(scheduledExecutorService, checkPeriod, timeUnit); super(scheduledExecutorService, executor, checkPeriod, timeUnit, false);
this.maxUsage = maxUsage; this.maxUsage = maxUsage;
} }
@ -57,7 +59,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
} }
public synchronized FileStoreMonitor addStore(File file) throws IOException { public synchronized FileStoreMonitor addStore(File file) throws IOException {
if (file.exists()) { // JDBC storage may return this as null, and we may need to ignore it
if (file != null && file.exists()) {
addStore(Files.getFileStore(file.toPath())); addStore(Files.getFileStore(file.toPath()));
} }
return this; return this;
@ -70,6 +73,7 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
public void run() { public void run() {
super.run();
tick(); tick();
} }

View File

@ -1796,11 +1796,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private StorageManager createStorageManager() { private StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) { if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
return new JDBCJournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO); return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, shutdownOnCriticalIO);
} }
// Default to File Based Storage Manager, (Legacy default configuration). // Default to File Based Storage Manager, (Legacy default configuration).
else { else {
return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO); return new JournalStorageManager(configuration, executorFactory, scheduledPool, shutdownOnCriticalIO);
} }
} }
return new NullStorageManager(); return new NullStorageManager();
@ -1974,7 +1974,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration()); deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
this.reloadManager = new ReloadManagerImpl(getScheduledPool(), configuration.getConfigurationFileRefreshPeriod()); this.reloadManager = new ReloadManagerImpl(getScheduledPool(), executorFactory.getExecutor(), configuration.getConfigurationFileRefreshPeriod());
if (configuration.getConfigurationUrl() != null && getScheduledPool() != null) { if (configuration.getConfigurationUrl() != null && getScheduledPool() != null) {
reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader()); reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader());
@ -2055,7 +2055,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
try { try {
injectMonitor(new FileStoreMonitor(getScheduledPool(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f)); injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
} }
catch (Exception e) { catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -37,11 +38,12 @@ public class ReloadManagerImpl extends ActiveMQScheduledComponent implements Rel
private Map<URL, ReloadRegistry> registry = new HashMap<>(); private Map<URL, ReloadRegistry> registry = new HashMap<>();
public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, long checkPeriod) { public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod) {
super(scheduledExecutorService, checkPeriod, TimeUnit.MILLISECONDS); super(scheduledExecutorService, executor, checkPeriod, TimeUnit.MILLISECONDS, false);
} }
public void run() { public void run() {
super.run();
tick(); tick();
} }

View File

@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.reload;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -37,18 +39,22 @@ public class ReloadManagerTest extends ActiveMQTestBase {
private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executorService;
private ReloadManagerImpl manager; private ReloadManagerImpl manager;
@Before @Before
public void startScheduled() { public void startScheduled() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5); scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
manager = new ReloadManagerImpl(scheduledExecutorService, 100); executorService = Executors.newSingleThreadExecutor();
manager = new ReloadManagerImpl(scheduledExecutorService, executorService, 100);
} }
@After @After
public void stopScheduled() { public void stopScheduled() {
manager.stop(); manager.stop();
scheduledExecutorService.shutdown(); scheduledExecutorService.shutdown();
executorService.shutdown();
scheduledExecutorService = null; scheduledExecutorService = null;
} }

View File

@ -39,8 +39,8 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;

View File

@ -23,6 +23,8 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.file.FileStore; import java.nio.file.FileStore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -39,16 +41,19 @@ import org.junit.Test;
public class FileStoreMonitorTest extends ActiveMQTestBase { public class FileStoreMonitorTest extends ActiveMQTestBase {
private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executorService;
@Before @Before
public void startScheduled() { public void startScheduled() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5); scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor();
} }
@After @After
public void stopScheduled() { public void stopScheduled() {
scheduledExecutorService.shutdown(); scheduledExecutorService.shutdown();
scheduledExecutorService = null; scheduledExecutorService = null;
executorService.shutdown();
} }
@Test @Test
@ -91,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
}; };
final AtomicBoolean fakeReturn = new AtomicBoolean(false); final AtomicBoolean fakeReturn = new AtomicBoolean(false);
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 100, TimeUnit.MILLISECONDS, 0.999) { FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) {
@Override @Override
protected double calculateUsage(FileStore store) throws IOException { protected double calculateUsage(FileStore store) throws IOException {
if (fakeReturn.get()) { if (fakeReturn.get()) {
@ -123,7 +128,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
@Test @Test
public void testScheduler() throws Exception { public void testScheduler() throws Exception {
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 20, TimeUnit.MILLISECONDS, 0.9); FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9);
final ReusableLatch latch = new ReusableLatch(5); final ReusableLatch latch = new ReusableLatch(5);
storeMonitor.addStore(getTestDirfile()); storeMonitor.addStore(getTestDirfile());

View File

@ -37,11 +37,11 @@ import java.io.OutputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.sql.Connection; import java.sql.Connection;
import java.sql.Driver; import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
@ -139,6 +139,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.FileUtil; import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.After; import org.junit.After;
@ -218,6 +219,16 @@ public abstract class ActiveMQTestBase extends Assert {
} }
}; };
@After
public void shutdownDerby() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception ignored) {
}
}
static { static {
Random random = new Random(); Random random = new Random();
DEFAULT_UDP_PORT = 6000 + random.nextInt(1000); DEFAULT_UDP_PORT = 6000 + random.nextInt(1000);
@ -550,60 +561,10 @@ public abstract class ActiveMQTestBase extends Assert {
} }
} }
private static int failedGCCalls = 0;
public static void forceGC() { public static void forceGC() {
ThreadLeakCheckRule.forceGC();
if (failedGCCalls >= 10) {
log.info("ignoring forceGC call since it seems System.gc is not working anyways");
return;
}
log.info("#test forceGC");
CountDownLatch finalized = new CountDownLatch(1);
WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
long timeout = System.currentTimeMillis() + 1000;
// A loop that will wait GC, using the minimal time as possible
while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
System.gc();
System.runFinalization();
try {
finalized.await(100, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
}
} }
if (dumbReference.get() != null) {
failedGCCalls++;
log.info("It seems that GC is disabled at your VM");
}
else {
// a success would reset the count
failedGCCalls = 0;
}
log.info("#test forceGC Done ");
}
public static void forceGC(final Reference<?> ref, final long timeout) {
long waitUntil = System.currentTimeMillis() + timeout;
// A loop that will wait GC, using the minimal time as possible
while (ref.get() != null && System.currentTimeMillis() < waitUntil) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add("Some string with garbage with concatenation " + i);
}
list.clear();
list = null;
System.gc();
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
}
}
}
/** /**
* Verifies whether weak references are released after a few GCs. * Verifies whether weak references are released after a few GCs.
@ -2514,19 +2475,4 @@ public abstract class ActiveMQTestBase extends Assert {
public static void waitForLatch(CountDownLatch latch) throws InterruptedException { public static void waitForLatch(CountDownLatch latch) throws InterruptedException {
assertTrue("Latch has got to return within a minute", latch.await(1, TimeUnit.MINUTES)); assertTrue("Latch has got to return within a minute", latch.await(1, TimeUnit.MINUTES));
} }
protected static class DumbReference {
private CountDownLatch finalized;
public DumbReference(CountDownLatch finalized) {
this.finalized = finalized;
}
@Override
public void finalize() throws Throwable {
finalized.countDown();
super.finalize();
}
}
} }

View File

@ -55,6 +55,13 @@
<version>${project.version}</version> <version>${project.version}</version>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>

View File

@ -105,6 +105,13 @@
<scope>test</scope> <scope>test</scope>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.activemq.tests</groupId> <groupId>org.apache.activemq.tests</groupId>
<artifactId>unit-tests</artifactId> <artifactId>unit-tests</artifactId>

View File

@ -53,6 +53,13 @@
<scope>test</scope> <scope>test</scope>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.activemq.tests</groupId> <groupId>org.apache.activemq.tests</groupId>
<artifactId>unit-tests</artifactId> <artifactId>unit-tests</artifactId>

View File

@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager; import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.jgroups.JChannel; import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator; import org.jgroups.conf.PlainConfigurator;
import org.junit.After; import org.junit.After;

View File

@ -16,9 +16,14 @@
*/ */
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal; package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import java.sql.DriverManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
@ -26,7 +31,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -45,10 +50,32 @@ public class JDBCJournalTest extends ActiveMQTestBase {
private String jdbcUrl; private String jdbcUrl;
private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executorService;
@After
@Override
public void tearDown() throws Exception {
journal.destroy();
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception ignored) {
}
scheduledExecutorService.shutdown();
scheduledExecutorService = null;
executorService.shutdown();
executorService = null;
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true"; jdbcUrl = "jdbc:derby:target/data;create=true";
journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS); journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS, scheduledExecutorService, executorService);
journal.start(); journal.start();
} }
@ -59,7 +86,6 @@ public class JDBCJournalTest extends ActiveMQTestBase {
journal.appendAddRecord(i, (byte) 1, new byte[0], true); journal.appendAddRecord(i, (byte) 1, new byte[0], true);
} }
Thread.sleep(3000);
assertEquals(noRecords, journal.getNumberOfRecords()); assertEquals(noRecords, journal.getNumberOfRecords());
} }
@ -122,9 +148,4 @@ public class JDBCJournalTest extends ActiveMQTestBase {
assertEquals(noRecords + (noTxRecords * noTx), recordInfos.size()); assertEquals(noRecords + (noTxRecords * noTx), recordInfos.size());
} }
@After
@Override
public void tearDown() throws Exception {
journal.destroy();
}
} }

View File

@ -1634,7 +1634,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final JournalStorageManager storage = new JournalStorageManager(config, factory, null); final JournalStorageManager storage = new JournalStorageManager(config, factory);
storage.start(); storage.start();
storage.loadInternalOnly(); storage.loadInternalOnly();

View File

@ -91,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
@Override @Override
protected JournalStorageManager createJournalStorageManager(Configuration configuration) { protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
return new JournalStorageManager(configuration, execFactory, null) { return new JournalStorageManager(configuration, execFactory) {
@Override @Override
public void deleteMessage(final long messageID) throws Exception { public void deleteMessage(final long messageID) throws Exception {
deletedMessage.add(messageID); deletedMessage.add(messageID);

View File

@ -65,7 +65,7 @@ public class RestartSMTest extends ActiveMQTestBase {
PostOffice postOffice = new FakePostOffice(); PostOffice postOffice = new FakePostOffice();
final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, null); final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory);
try { try {

View File

@ -20,6 +20,8 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
@ -47,6 +49,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
protected ExecutorFactory execFactory; protected ExecutorFactory execFactory;
protected ScheduledExecutorService scheduledExecutorService;
protected StorageManager journal; protected StorageManager journal;
protected JMSStorageManager jmsJournal; protected JMSStorageManager jmsJournal;
@ -73,6 +77,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
super.setUp(); super.setUp();
execFactory = getOrderedExecutor(); execFactory = getOrderedExecutor();
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
} }
@Override @Override
@ -103,6 +109,8 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal = null; jmsJournal = null;
} }
scheduledExecutorService.shutdown();
destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"})); destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"}));
super.tearDown(); super.tearDown();
if (exception != null) if (exception != null)
@ -132,7 +140,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration * @param configuration
*/ */
protected JournalStorageManager createJournalStorageManager(Configuration configuration) { protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, null); JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory);
addActiveMQComponent(jsm); addActiveMQComponent(jsm);
return jsm; return jsm;
} }
@ -141,7 +149,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @param configuration * @param configuration
*/ */
protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) { protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, null); JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, scheduledExecutorService);
addActiveMQComponent(jsm); addActiveMQComponent(jsm);
return jsm; return jsm;
} }

View File

@ -440,7 +440,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
* @throws Exception * @throws Exception
*/ */
private JournalStorageManager getStorage() throws Exception { private JournalStorageManager getStorage() throws Exception {
return new JournalStorageManager(createDefaultInVMConfig(), factory, null); return new JournalStorageManager(createDefaultInVMConfig(), factory);
} }
/** /**

View File

@ -38,6 +38,13 @@
<scope>test</scope> <scope>test</scope>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId> <artifactId>artemis-server</artifactId>

View File

@ -92,7 +92,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
journal = new JournalStorageManager(configuration, factory, null); journal = new JournalStorageManager(configuration, factory);
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@ -112,7 +112,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
journal.stop(); journal.stop();
journal = new JournalStorageManager(configuration, factory, null); journal = new JournalStorageManager(configuration, factory);
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@ -135,7 +135,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
mapDups.clear(); mapDups.clear();
journal = new JournalStorageManager(configuration, factory, null); journal = new JournalStorageManager(configuration, factory);
journal.start(); journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());