ARTEMIS-1842 Always use Executor on Topology

This commit is contained in:
Clebert Suconic 2018-05-01 17:06:57 -04:00
parent fef6d5fc65
commit e8a1e43ea2
5 changed files with 35 additions and 19 deletions

View File

@ -16,9 +16,10 @@
*/ */
package org.apache.activemq.artemis.api.core.client; package org.apache.activemq.artemis.api.core.client;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -797,5 +798,8 @@ public interface ServerLocator extends AutoCloseable {
String getOutgoingInterceptorList(); String getOutgoingInterceptorList();
boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor); boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
/** This will only instantiate internal objects such as the topology */
void initialize() throws ActiveMQException;
} }

View File

@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.Actor; import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores; import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -111,7 +112,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private final StaticConnector staticConnector = new StaticConnector(); private final StaticConnector staticConnector = new StaticConnector();
private final Topology topology; private Topology topology;
private final Object topologyArrayGuard = new Object(); private final Object topologyArrayGuard = new Object();
@ -124,7 +125,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// if the system should shutdown the pool when shutting down // if the system should shutdown the pool when shutting down
private transient boolean shutdownPool; private transient boolean shutdownPool;
private transient ExecutorService threadPool; private transient Executor threadPool;
private transient ScheduledExecutorService scheduledThreadPool; private transient ScheduledExecutorService scheduledThreadPool;
@ -250,13 +251,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
} }
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray); this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
} }
@Override @Override
public synchronized boolean setThreadPools(ExecutorService threadPool, public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPool) {
ScheduledExecutorService scheduledThreadPool) {
if (threadPool == null || scheduledThreadPool == null) if (threadPool == null || scheduledThreadPool == null)
return false; return false;
@ -285,7 +284,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}); });
} }
private synchronized void initialise() throws ActiveMQException { @Override
public synchronized void initialize() throws ActiveMQException {
if (state == STATE.INITIALIZED) if (state == STATE.INITIALIZED)
return; return;
synchronized (stateGuard) { synchronized (stateGuard) {
@ -297,6 +297,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
setThreadPools(); setThreadPools();
topology.setExecutor(new OrderedExecutor(threadPool));
instantiateLoadBalancingPolicy(); instantiateLoadBalancingPolicy();
if (discoveryGroupConfiguration != null) { if (discoveryGroupConfiguration != null) {
@ -564,7 +566,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override @Override
public void start(Executor executor) throws Exception { public void start(Executor executor) throws Exception {
initialise(); initialize();
this.startExecutor = executor; this.startExecutor = executor;
@ -681,7 +683,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception { public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception {
assertOpen(); assertOpen();
initialise(); initialize();
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
@ -707,7 +709,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
boolean failoverOnInitialConnection) throws Exception { boolean failoverOnInitialConnection) throws Exception {
assertOpen(); assertOpen();
initialise(); initialize();
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
@ -744,7 +746,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientSessionFactory createSessionFactory() throws ActiveMQException { public ClientSessionFactory createSessionFactory() throws ActiveMQException {
assertOpen(); assertOpen();
initialise(); initialize();
flushTopology(); flushTopology();
@ -1389,10 +1391,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (shutdownPool) { if (shutdownPool) {
if (threadPool != null) { if (threadPool != null) {
threadPool.shutdown(); ExecutorService executorService = (ExecutorService) threadPool;
executorService.shutdown();
try { try {
if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) { if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination(); ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -1662,7 +1665,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientSessionFactory connect(boolean skipWarnings) throws ActiveMQException { public ClientSessionFactory connect(boolean skipWarnings) throws ActiveMQException {
assertOpen(); assertOpen();
initialise(); initialize();
createConnectors(); createConnectors();

View File

@ -39,7 +39,7 @@ public final class Topology {
private final Set<ClusterTopologyListener> topologyListeners; private final Set<ClusterTopologyListener> topologyListeners;
private final Executor executor; private Executor executor;
/** /**
* Used to debug operations. * Used to debug operations.
@ -85,6 +85,11 @@ public final class Topology {
} }
} }
public Topology setExecutor(Executor executor) {
this.executor = executor;
return this;
}
/** /**
* It will remove all elements as if it haven't received anyone from the server. * It will remove all elements as if it haven't received anyone from the server.
*/ */

View File

@ -229,9 +229,7 @@ public class ClientThreadPoolsTest {
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool"); Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool"); Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise"); serverLocator.initialize();
initialise.setAccessible(true);
initialise.invoke(serverLocator);
threadPoolField.setAccessible(true); threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true); scheduledThreadPoolField.setAccessible(true);

View File

@ -189,6 +189,12 @@ public class ClusterController implements ActiveMQComponent {
serverLocator.setInitialConnectAttempts(-1); serverLocator.setInitialConnectAttempts(-1);
//this is used for replication so need to use the server packet decoder //this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
try {
serverLocator.initialize();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
locators.put(name, serverLocator); locators.put(name, serverLocator);
} }