diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadFactory.java index 1644715cc4..419daa28b2 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadFactory.java @@ -20,6 +20,7 @@ import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public final class ActiveMQThreadFactory implements ThreadFactory { @@ -28,6 +29,8 @@ public final class ActiveMQThreadFactory implements ThreadFactory { private final AtomicInteger threadCount = new AtomicInteger(0); + private final ReusableLatch active = new ReusableLatch(0); + private final int threadPriority; private final boolean daemon; @@ -96,8 +99,28 @@ public final class ActiveMQThreadFactory implements ThreadFactory { } } + /** It will wait all threads to finish */ + public boolean join(int timeout, TimeUnit timeUnit) { + try { + return active.await(timeout, timeUnit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + private Thread createThread(final Runnable command) { - final Thread t = new Thread(command, prefix + threadCount.getAndIncrement() + " (" + groupName + ")"); + active.countUp(); + final Thread t = new Thread(command, prefix + threadCount.getAndIncrement() + " (" + groupName + ")") { + @Override + public void run() { + try { + command.run(); + } finally { + active.countDown(); + } + } + }; t.setDaemon(daemon); t.setPriority(threadPriority); t.setContextClassLoader(tccl); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index e635bc46b7..5693d35a08 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -105,6 +105,17 @@ public class InVMConnector extends AbstractConnector { public static synchronized void resetThreadPool() { if (threadPoolExecutor != null) { threadPoolExecutor.shutdownNow(); + if (threadPoolExecutor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor tp = (ThreadPoolExecutor) threadPoolExecutor; + if (tp.getThreadFactory() instanceof ActiveMQThreadFactory) { + ActiveMQThreadFactory tf = (ActiveMQThreadFactory)tp.getThreadFactory(); + if (!tf.join(10, TimeUnit.SECONDS)) { + // resetThreadPool is only used on tests. + // no need to use a logger method, this is just fine. + logger.warn("Thread pool is still busy. couldn't stop on time"); + } + } + } threadPoolExecutor = null; } } diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java index 6bdf664b99..d3bdaf1fb8 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQFailoverTest.java @@ -18,8 +18,6 @@ package org.apache.activemq.artemis.tests.compatibility; import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -41,12 +39,12 @@ import org.junit.runners.Parameterized; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -/** This test will run a hornetq server with artemis clients - * and it will make sure that failover happens without any problems. */ +/** + * This test will run a hornetq server with artemis clients + * and it will make sure that failover happens without any problems. + */ @RunWith(Parameterized.class) public class HQFailoverTest extends VersionedBaseTest { @@ -101,15 +99,10 @@ public class HQFailoverTest extends VersionedBaseTest { assertTrue(latch.await(10, TimeUnit.SECONDS)); - session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - conn.start(); - queue = session.createQueue("queue"); - MessageConsumer consumer = session.createConsumer(queue); + // We should still be able to send more after failover. + // This test is to validate stuff can still work well after failover against hornetq for (int i = 0; i < 10; i++) { - Message msg = consumer.receive(5000); - assertNotNull(msg); + producer.send(session.createTextMessage(textBody + i)); } - assertNull(consumer.receiveNoWait()); - } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java index dbe6bcb56d..a20bf74062 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java @@ -62,11 +62,11 @@ public class ReattachTest extends ActiveMQTestBase { */ @Test public void testImmediateReattach() throws Exception { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 1d; - final int reconnectAttempts = 1; + final int reconnectAttempts = 10; locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024); @@ -189,7 +189,7 @@ public class ReattachTest extends ActiveMQTestBase { */ @Test public void testDelayedReattach() throws Exception { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 1d; @@ -264,13 +264,13 @@ public class ReattachTest extends ActiveMQTestBase { // Test an async (e.g. pinger) failure coming in while a connection manager is already reconnecting @Test public void testAsyncFailureWhileReattaching() throws Exception { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 1d; final int reconnectAttempts = 60; - final long asyncFailDelay = 2000; + final long asyncFailDelay = 200; locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024 * 1024); @@ -372,7 +372,7 @@ public class ReattachTest extends ActiveMQTestBase { @Test public void testReattachAttemptsFailsToReconnect() throws Exception { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 1d; @@ -539,7 +539,7 @@ public class ReattachTest extends ActiveMQTestBase { @Test public void testCreateSessionFailBeforeSendSeveralThreads() throws Throwable { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 1d; @@ -686,7 +686,7 @@ public class ReattachTest extends ActiveMQTestBase { @Test public void testReattachAttemptsSucceedsInReconnecting() throws Exception { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 1d; @@ -824,7 +824,7 @@ public class ReattachTest extends ActiveMQTestBase { @Test public void testExponentialBackoff() throws Exception { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 2d; @@ -891,13 +891,13 @@ public class ReattachTest extends ActiveMQTestBase { @Test public void testExponentialBackoffMaxRetryInterval() throws Exception { - final long retryInterval = 500; + final long retryInterval = 50; final double retryMultiplier = 2d; final int reconnectAttempts = 60; - final long maxRetryInterval = 1000; + final long maxRetryInterval = 100; locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setMaxRetryInterval(maxRetryInterval).setConfirmationWindowSize(1024 * 1024);