This commit is contained in:
Clebert Suconic 2018-03-01 14:40:30 -05:00
commit 86c4596482
4 changed files with 53 additions and 26 deletions

View File

@ -20,6 +20,7 @@ import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class ActiveMQThreadFactory implements ThreadFactory {
@ -28,6 +29,8 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
private final AtomicInteger threadCount = new AtomicInteger(0);
private final ReusableLatch active = new ReusableLatch(0);
private final int threadPriority;
private final boolean daemon;
@ -96,8 +99,28 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
}
}
/** It will wait all threads to finish */
public boolean join(int timeout, TimeUnit timeUnit) {
try {
return active.await(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
private Thread createThread(final Runnable command) {
final Thread t = new Thread(command, prefix + threadCount.getAndIncrement() + " (" + groupName + ")");
active.countUp();
final Thread t = new Thread(command, prefix + threadCount.getAndIncrement() + " (" + groupName + ")") {
@Override
public void run() {
try {
command.run();
} finally {
active.countDown();
}
}
};
t.setDaemon(daemon);
t.setPriority(threadPriority);
t.setContextClassLoader(tccl);

View File

@ -105,6 +105,17 @@ public class InVMConnector extends AbstractConnector {
public static synchronized void resetThreadPool() {
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdownNow();
if (threadPoolExecutor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) threadPoolExecutor;
if (tp.getThreadFactory() instanceof ActiveMQThreadFactory) {
ActiveMQThreadFactory tf = (ActiveMQThreadFactory)tp.getThreadFactory();
if (!tf.join(10, TimeUnit.SECONDS)) {
// resetThreadPool is only used on tests.
// no need to use a logger method, this is just fine.
logger.warn("Thread pool is still busy. couldn't stop on time");
}
}
}
threadPoolExecutor = null;
}
}

View File

@ -18,8 +18,6 @@
package org.apache.activemq.artemis.tests.compatibility;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@ -41,12 +39,12 @@ import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/** This test will run a hornetq server with artemis clients
* and it will make sure that failover happens without any problems. */
/**
* This test will run a hornetq server with artemis clients
* and it will make sure that failover happens without any problems.
*/
@RunWith(Parameterized.class)
public class HQFailoverTest extends VersionedBaseTest {
@ -101,15 +99,10 @@ public class HQFailoverTest extends VersionedBaseTest {
assertTrue(latch.await(10, TimeUnit.SECONDS));
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
queue = session.createQueue("queue");
MessageConsumer consumer = session.createConsumer(queue);
// We should still be able to send more after failover.
// This test is to validate stuff can still work well after failover against hornetq
for (int i = 0; i < 10; i++) {
Message msg = consumer.receive(5000);
assertNotNull(msg);
producer.send(session.createTextMessage(textBody + i));
}
assertNull(consumer.receiveNoWait());
}
}

View File

@ -62,11 +62,11 @@ public class ReattachTest extends ActiveMQTestBase {
*/
@Test
public void testImmediateReattach() throws Exception {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 1d;
final int reconnectAttempts = 1;
final int reconnectAttempts = 10;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024);
@ -189,7 +189,7 @@ public class ReattachTest extends ActiveMQTestBase {
*/
@Test
public void testDelayedReattach() throws Exception {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 1d;
@ -264,13 +264,13 @@ public class ReattachTest extends ActiveMQTestBase {
// Test an async (e.g. pinger) failure coming in while a connection manager is already reconnecting
@Test
public void testAsyncFailureWhileReattaching() throws Exception {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 1d;
final int reconnectAttempts = 60;
final long asyncFailDelay = 2000;
final long asyncFailDelay = 200;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024);
@ -372,7 +372,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test
public void testReattachAttemptsFailsToReconnect() throws Exception {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 1d;
@ -539,7 +539,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test
public void testCreateSessionFailBeforeSendSeveralThreads() throws Throwable {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 1d;
@ -686,7 +686,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test
public void testReattachAttemptsSucceedsInReconnecting() throws Exception {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 1d;
@ -824,7 +824,7 @@ public class ReattachTest extends ActiveMQTestBase {
@Test
public void testExponentialBackoff() throws Exception {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 2d;
@ -891,13 +891,13 @@ public class ReattachTest extends ActiveMQTestBase {
@Test
public void testExponentialBackoffMaxRetryInterval() throws Exception {
final long retryInterval = 500;
final long retryInterval = 50;
final double retryMultiplier = 2d;
final int reconnectAttempts = 60;
final long maxRetryInterval = 1000;
final long maxRetryInterval = 100;
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setMaxRetryInterval(maxRetryInterval).setConfirmationWindowSize(1024 * 1024);