NO-JIRA Speeding up ReattachTest

This commit is contained in:
Clebert Suconic 2018-02-28 17:07:00 -05:00
parent bdd2c09c58
commit e4e864d88e
3 changed files with 46 additions and 12 deletions

View File

@ -20,6 +20,7 @@ import java.security.AccessControlContext;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public final class ActiveMQThreadFactory implements ThreadFactory { public final class ActiveMQThreadFactory implements ThreadFactory {
@ -28,6 +29,8 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
private final AtomicInteger threadCount = new AtomicInteger(0); private final AtomicInteger threadCount = new AtomicInteger(0);
private final ReusableLatch active = new ReusableLatch(0);
private final int threadPriority; private final int threadPriority;
private final boolean daemon; private final boolean daemon;
@ -96,8 +99,28 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
} }
} }
/** It will wait all threads to finish */
public boolean join(int timeout, TimeUnit timeUnit) {
try {
return active.await(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
private Thread createThread(final Runnable command) { private Thread createThread(final Runnable command) {
final Thread t = new Thread(command, prefix + threadCount.getAndIncrement() + " (" + groupName + ")"); active.countUp();
final Thread t = new Thread(command, prefix + threadCount.getAndIncrement() + " (" + groupName + ")") {
@Override
public void run() {
try {
command.run();
} finally {
active.countDown();
}
}
};
t.setDaemon(daemon); t.setDaemon(daemon);
t.setPriority(threadPriority); t.setPriority(threadPriority);
t.setContextClassLoader(tccl); t.setContextClassLoader(tccl);

View File

@ -105,6 +105,17 @@ public class InVMConnector extends AbstractConnector {
public static synchronized void resetThreadPool() { public static synchronized void resetThreadPool() {
if (threadPoolExecutor != null) { if (threadPoolExecutor != null) {
threadPoolExecutor.shutdownNow(); threadPoolExecutor.shutdownNow();
if (threadPoolExecutor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) threadPoolExecutor;
if (tp.getThreadFactory() instanceof ActiveMQThreadFactory) {
ActiveMQThreadFactory tf = (ActiveMQThreadFactory)tp.getThreadFactory();
if (!tf.join(10, TimeUnit.SECONDS)) {
// resetThreadPool is only used on tests.
// no need to use a logger method, this is just fine.
logger.warn("Thread pool is still busy. couldn't stop on time");
}
}
}
threadPoolExecutor = null; threadPoolExecutor = null;
} }
} }

View File

@ -62,11 +62,11 @@ public class ReattachTest extends ActiveMQTestBase {
*/ */
@Test @Test
public void testImmediateReattach() throws Exception { public void testImmediateReattach() throws Exception {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
final int reconnectAttempts = 1; final int reconnectAttempts = 10;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024); locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024);
@ -189,7 +189,7 @@ public class ReattachTest extends ActiveMQTestBase {
*/ */
@Test @Test
public void testDelayedReattach() throws Exception { public void testDelayedReattach() throws Exception {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
@ -264,13 +264,13 @@ public class ReattachTest extends ActiveMQTestBase {
// Test an async (e.g. pinger) failure coming in while a connection manager is already reconnecting // Test an async (e.g. pinger) failure coming in while a connection manager is already reconnecting
@Test @Test
public void testAsyncFailureWhileReattaching() throws Exception { public void testAsyncFailureWhileReattaching() throws Exception {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
final int reconnectAttempts = 60; final int reconnectAttempts = 60;
final long asyncFailDelay = 2000; final long asyncFailDelay = 200;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024); locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024);
@ -372,7 +372,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test @Test
public void testReattachAttemptsFailsToReconnect() throws Exception { public void testReattachAttemptsFailsToReconnect() throws Exception {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
@ -539,7 +539,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test @Test
public void testCreateSessionFailBeforeSendSeveralThreads() throws Throwable { public void testCreateSessionFailBeforeSendSeveralThreads() throws Throwable {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
@ -686,7 +686,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test @Test
public void testReattachAttemptsSucceedsInReconnecting() throws Exception { public void testReattachAttemptsSucceedsInReconnecting() throws Exception {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
@ -824,7 +824,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test @Test
public void testExponentialBackoff() throws Exception { public void testExponentialBackoff() throws Exception {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 2d; final double retryMultiplier = 2d;
@ -891,13 +891,13 @@ public class ReattachTest extends ActiveMQTestBase {
@Test @Test
public void testExponentialBackoffMaxRetryInterval() throws Exception { public void testExponentialBackoffMaxRetryInterval() throws Exception {
final long retryInterval = 500; final long retryInterval = 50;
final double retryMultiplier = 2d; final double retryMultiplier = 2d;
final int reconnectAttempts = 60; final int reconnectAttempts = 60;
final long maxRetryInterval = 1000; final long maxRetryInterval = 100;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setMaxRetryInterval(maxRetryInterval).setConfirmationWindowSize(1024 * 1024); locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setMaxRetryInterval(maxRetryInterval).setConfirmationWindowSize(1024 * 1024);