diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index 04bd1f63b6..38aa1a7f30 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -16,9 +16,10 @@ */ package org.apache.activemq.artemis.api.core.client; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -797,5 +798,8 @@ public interface ServerLocator extends AutoCloseable { String getOutgoingInterceptorList(); - boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor); + boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor); + + /** This will only instantiate internal objects such as the topology */ + void initialize() throws ActiveMQException; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 978cc39bc8..972c9c7178 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.actors.Actor; +import org.apache.activemq.artemis.utils.actors.OrderedExecutor; import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores; import org.jboss.logging.Logger; @@ -111,7 +112,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private final StaticConnector staticConnector = new StaticConnector(); - private final Topology topology; + private Topology topology; private final Object topologyArrayGuard = new Object(); @@ -124,7 +125,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery // if the system should shutdown the pool when shutting down private transient boolean shutdownPool; - private transient ExecutorService threadPool; + private transient Executor threadPool; private transient ScheduledExecutorService scheduledThreadPool; @@ -214,10 +215,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private final Exception traceException = new Exception(); - // To be called when there are ServerLocator being finalized. - // To be used on test assertions - public static Runnable finalizeCallback = null; - public static synchronized void clearThreadPools() { ActiveMQClient.clearThreadPools(); } @@ -254,13 +251,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); } - this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray); } @Override - public synchronized boolean setThreadPools(ExecutorService threadPool, - ScheduledExecutorService scheduledThreadPool) { + public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPool) { if (threadPool == null || scheduledThreadPool == null) return false; @@ -289,7 +284,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery }); } - private synchronized void initialise() throws ActiveMQException { + @Override + public synchronized void initialize() throws ActiveMQException { if (state == STATE.INITIALIZED) return; synchronized (stateGuard) { @@ -301,6 +297,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery setThreadPools(); + topology.setExecutor(new OrderedExecutor(threadPool)); + instantiateLoadBalancingPolicy(); if (discoveryGroupConfiguration != null) { @@ -568,7 +566,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery @Override public void start(Executor executor) throws Exception { - initialise(); + initialize(); this.startExecutor = executor; @@ -685,7 +683,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception { assertOpen(); - initialise(); + initialize(); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); @@ -711,7 +709,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery boolean failoverOnInitialConnection) throws Exception { assertOpen(); - initialise(); + initialize(); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); @@ -748,7 +746,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory createSessionFactory() throws ActiveMQException { assertOpen(); - initialise(); + initialize(); flushTopology(); @@ -1393,10 +1391,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (shutdownPool) { if (threadPool != null) { - threadPool.shutdown(); + ExecutorService executorService = (ExecutorService) threadPool; + executorService.shutdown(); try { - if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) { + if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) { ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination(); } } catch (InterruptedException e) { @@ -1666,7 +1665,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory connect(boolean skipWarnings) throws ActiveMQException { assertOpen(); - initialise(); + initialize(); createConnectors(); @@ -1784,10 +1783,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (!isClosed() && finalizeCheck) { ActiveMQClientLogger.LOGGER.serverLocatorNotClosed(traceException, System.identityHashCode(this)); - if (ServerLocatorImpl.finalizeCallback != null) { - ServerLocatorImpl.finalizeCallback.run(); - } - close(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java index 5db3e6b948..3b1c64ee71 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java @@ -39,7 +39,7 @@ public final class Topology { private final Set topologyListeners; - private final Executor executor; + private Executor executor; /** * Used to debug operations. @@ -85,6 +85,11 @@ public final class Topology { } } + public Topology setExecutor(Executor executor) { + this.executor = executor; + return this; + } + /** * It will remove all elements as if it haven't received anyone from the server. */ diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java index ad6363e376..77271f464f 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java @@ -229,9 +229,7 @@ public class ClientThreadPoolsTest { Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool"); Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool"); - Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise"); - initialise.setAccessible(true); - initialise.invoke(serverLocator); + serverLocator.initialize(); threadPoolField.setAccessible(true); scheduledThreadPoolField.setAccessible(true); diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index ede1b01ede..e52735260d 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -390,24 +390,8 @@ public class ActiveMQActivation { } } - Thread threadTearDown = new Thread("TearDown/ActiveMQActivation") { - @Override - public void run() { - for (ActiveMQMessageHandler handler : handlersCopy) { - handler.teardown(); - } - } - }; - - // We will first start a new thread that will call tearDown on all the instances, trying to graciously shutdown everything. - // We will then use the call-timeout to determine a timeout. - // if that failed we will then close the connection factory, and interrupt the thread - threadTearDown.start(); - - try { - threadTearDown.join(timeout); - } catch (InterruptedException e) { - // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up + for (ActiveMQMessageHandler handler : handlersCopy) { + handler.teardown(); } if (factory != null) { @@ -421,20 +405,6 @@ public class ActiveMQActivation { factory = null; } - if (threadTearDown.isAlive()) { - threadTearDown.interrupt(); - - try { - threadTearDown.join(5000); - } catch (InterruptedException e) { - // nothing to be done here.. we are going down anyways - } - - if (threadTearDown.isAlive()) { - ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString()); - } - } - nodes.clear(); lastReceived = false; @@ -548,9 +518,7 @@ public class ActiveMQActivation { calculatedDestinationName = spec.getQueuePrefix() + calculatedDestinationName; } - logger.debug("Unable to retrieve " + destinationName + - " from JNDI. Creating a new " + destinationType.getName() + - " named " + calculatedDestinationName + " to be used by the MDB."); + logger.debug("Unable to retrieve " + destinationName + " from JNDI. Creating a new " + destinationType.getName() + " named " + calculatedDestinationName + " to be used by the MDB."); // If there is no binding on naming, we will just create a new instance if (isTopic) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index c77b297019..03eb2432ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -189,6 +189,12 @@ public class ClusterController implements ActiveMQComponent { serverLocator.setInitialConnectAttempts(-1); //this is used for replication so need to use the server packet decoder serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); + serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); + try { + serverLocator.initialize(); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } locators.put(name, serverLocator); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java index d8f0d73418..dd1248df48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java @@ -64,6 +64,8 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener private final NetworkHealthCheck networkHealthCheck; + private boolean stopped = false; + /** * This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage} * with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with @@ -266,6 +268,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener * @param explicitSignal the state we want to set the quorum manager to return */ public synchronized void causeExit(BACKUP_ACTIVATION explicitSignal) { + stopped = true; removeListener(); this.signal = explicitSignal; latch.countDown(); @@ -287,7 +290,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; synchronized (voteGuard) { - while (!decision && voteAttempts++ < voteRetries) { + while (!stopped && voteAttempts++ < voteRetries) { //the live is dead so lets vote for quorum QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index f51d0f8236..ce6dba1255 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -311,6 +311,7 @@ public abstract class ActiveMQTestBase extends Assert { for (Exception exception : exceptions) { exception.printStackTrace(System.out); } + System.out.println(threadDump("Thread dump with reconnects happening")); fail("Client Session Factories still trying to reconnect, see above to see where created"); } Map threadMap = Thread.getAllStackTraces(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/CloseConnectionFactoryOnGCTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/CloseConnectionFactoryOnGCTest.java deleted file mode 100644 index 3a445d89d3..0000000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/CloseConnectionFactoryOnGCTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.jms.connection; - -import javax.jms.Connection; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; -import org.apache.activemq.artemis.api.jms.JMSFactoryType; -import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.tests.util.JMSTestBase; -import org.junit.Test; - -/** - * A CloseConnectionOnGCTest - */ -public class CloseConnectionFactoryOnGCTest extends JMSTestBase { - - @Test(timeout = 60000) - public void testCloseCFOnGC() throws Exception { - - final AtomicInteger valueGC = new AtomicInteger(0); - - ServerLocatorImpl.finalizeCallback = new Runnable() { - @Override - public void run() { - valueGC.incrementAndGet(); - } - }; - - try { - // System.setOut(out); - for (int i = 0; i < 100; i++) { - ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY)); - Connection conn = cf.createConnection(); - cf = null; - conn.close(); - conn = null; - } - forceGC(); - } finally { - ServerLocatorImpl.finalizeCallback = null; - } - - assertEquals("The code is throwing exceptions", 0, valueGC.get()); - - } -} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerXATest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerXATest.java index db4db9bf2c..e2a2854679 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerXATest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerXATest.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; -import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Test; @@ -147,11 +146,6 @@ public class ActiveMQMessageHandlerXATest extends ActiveMQRATestBase { assertTrue(endpoint.interrupted); assertNotNull(endpoint.lastMessage); assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring"); - - Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE); - Wait.waitFor(() -> getMessageCount((Queue) binding.getBindable()) == 1); - long messageCount = getMessageCount((Queue) binding.getBindable()); - assertEquals(1, messageCount); } @Test