diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index 04bd1f63b6..38aa1a7f30 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -16,9 +16,10 @@ */ package org.apache.activemq.artemis.api.core.client; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -797,5 +798,8 @@ public interface ServerLocator extends AutoCloseable { String getOutgoingInterceptorList(); - boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor); + boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor); + + /** This will only instantiate internal objects such as the topology */ + void initialize() throws ActiveMQException; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index fab1523245..972c9c7178 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.actors.Actor; +import org.apache.activemq.artemis.utils.actors.OrderedExecutor; import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores; import org.jboss.logging.Logger; @@ -111,7 +112,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private final StaticConnector staticConnector = new StaticConnector(); - private final Topology topology; + private Topology topology; private final Object topologyArrayGuard = new Object(); @@ -124,7 +125,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery // if the system should shutdown the pool when shutting down private transient boolean shutdownPool; - private transient ExecutorService threadPool; + private transient Executor threadPool; private transient ScheduledExecutorService scheduledThreadPool; @@ -250,13 +251,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); } - this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray); } @Override - public synchronized boolean setThreadPools(ExecutorService threadPool, - ScheduledExecutorService scheduledThreadPool) { + public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPool) { if (threadPool == null || scheduledThreadPool == null) return false; @@ -285,7 +284,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery }); } - private synchronized void initialise() throws ActiveMQException { + @Override + public synchronized void initialize() throws ActiveMQException { if (state == STATE.INITIALIZED) return; synchronized (stateGuard) { @@ -297,6 +297,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery setThreadPools(); + topology.setExecutor(new OrderedExecutor(threadPool)); + instantiateLoadBalancingPolicy(); if (discoveryGroupConfiguration != null) { @@ -564,7 +566,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery @Override public void start(Executor executor) throws Exception { - initialise(); + initialize(); this.startExecutor = executor; @@ -681,7 +683,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception { assertOpen(); - initialise(); + initialize(); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); @@ -707,7 +709,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery boolean failoverOnInitialConnection) throws Exception { assertOpen(); - initialise(); + initialize(); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); @@ -744,7 +746,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory createSessionFactory() throws ActiveMQException { assertOpen(); - initialise(); + initialize(); flushTopology(); @@ -1389,10 +1391,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (shutdownPool) { if (threadPool != null) { - threadPool.shutdown(); + ExecutorService executorService = (ExecutorService) threadPool; + executorService.shutdown(); try { - if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) { + if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) { ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination(); } } catch (InterruptedException e) { @@ -1662,7 +1665,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory connect(boolean skipWarnings) throws ActiveMQException { assertOpen(); - initialise(); + initialize(); createConnectors(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java index 5db3e6b948..3b1c64ee71 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java @@ -39,7 +39,7 @@ public final class Topology { private final Set topologyListeners; - private final Executor executor; + private Executor executor; /** * Used to debug operations. @@ -85,6 +85,11 @@ public final class Topology { } } + public Topology setExecutor(Executor executor) { + this.executor = executor; + return this; + } + /** * It will remove all elements as if it haven't received anyone from the server. */ diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java index ad6363e376..77271f464f 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java @@ -229,9 +229,7 @@ public class ClientThreadPoolsTest { Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool"); Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool"); - Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise"); - initialise.setAccessible(true); - initialise.invoke(serverLocator); + serverLocator.initialize(); threadPoolField.setAccessible(true); scheduledThreadPoolField.setAccessible(true); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index c77b297019..03eb2432ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -189,6 +189,12 @@ public class ClusterController implements ActiveMQComponent { serverLocator.setInitialConnectAttempts(-1); //this is used for replication so need to use the server packet decoder serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); + serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); + try { + serverLocator.initialize(); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } locators.put(name, serverLocator); }