This commit is contained in:
Clebert Suconic 2018-05-02 11:38:50 -04:00
commit 13fac86082
10 changed files with 43 additions and 133 deletions

View File

@ -16,9 +16,10 @@
*/
package org.apache.activemq.artemis.api.core.client;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -797,5 +798,8 @@ public interface ServerLocator extends AutoCloseable {
String getOutgoingInterceptorList();
boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPoolExecutor);
/** This will only instantiate internal objects such as the topology */
void initialize() throws ActiveMQException;
}

View File

@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
import org.jboss.logging.Logger;
@ -111,7 +112,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private final StaticConnector staticConnector = new StaticConnector();
private final Topology topology;
private Topology topology;
private final Object topologyArrayGuard = new Object();
@ -124,7 +125,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// if the system should shutdown the pool when shutting down
private transient boolean shutdownPool;
private transient ExecutorService threadPool;
private transient Executor threadPool;
private transient ScheduledExecutorService scheduledThreadPool;
@ -214,10 +215,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private final Exception traceException = new Exception();
// To be called when there are ServerLocator being finalized.
// To be used on test assertions
public static Runnable finalizeCallback = null;
public static synchronized void clearThreadPools() {
ActiveMQClient.clearThreadPools();
}
@ -254,13 +251,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
}
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
}
@Override
public synchronized boolean setThreadPools(ExecutorService threadPool,
ScheduledExecutorService scheduledThreadPool) {
public synchronized boolean setThreadPools(Executor threadPool, ScheduledExecutorService scheduledThreadPool) {
if (threadPool == null || scheduledThreadPool == null)
return false;
@ -289,7 +284,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
});
}
private synchronized void initialise() throws ActiveMQException {
@Override
public synchronized void initialize() throws ActiveMQException {
if (state == STATE.INITIALIZED)
return;
synchronized (stateGuard) {
@ -301,6 +297,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
setThreadPools();
topology.setExecutor(new OrderedExecutor(threadPool));
instantiateLoadBalancingPolicy();
if (discoveryGroupConfiguration != null) {
@ -568,7 +566,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public void start(Executor executor) throws Exception {
initialise();
initialize();
this.startExecutor = executor;
@ -685,7 +683,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception {
assertOpen();
initialise();
initialize();
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
@ -711,7 +709,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
boolean failoverOnInitialConnection) throws Exception {
assertOpen();
initialise();
initialize();
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
@ -748,7 +746,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientSessionFactory createSessionFactory() throws ActiveMQException {
assertOpen();
initialise();
initialize();
flushTopology();
@ -1393,10 +1391,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (shutdownPool) {
if (threadPool != null) {
threadPool.shutdown();
ExecutorService executorService = (ExecutorService) threadPool;
executorService.shutdown();
try {
if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination();
}
} catch (InterruptedException e) {
@ -1666,7 +1665,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientSessionFactory connect(boolean skipWarnings) throws ActiveMQException {
assertOpen();
initialise();
initialize();
createConnectors();
@ -1784,10 +1783,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (!isClosed() && finalizeCheck) {
ActiveMQClientLogger.LOGGER.serverLocatorNotClosed(traceException, System.identityHashCode(this));
if (ServerLocatorImpl.finalizeCallback != null) {
ServerLocatorImpl.finalizeCallback.run();
}
close();
}

View File

@ -39,7 +39,7 @@ public final class Topology {
private final Set<ClusterTopologyListener> topologyListeners;
private final Executor executor;
private Executor executor;
/**
* Used to debug operations.
@ -85,6 +85,11 @@ public final class Topology {
}
}
public Topology setExecutor(Executor executor) {
this.executor = executor;
return this;
}
/**
* It will remove all elements as if it haven't received anyone from the server.
*/

View File

@ -229,9 +229,7 @@ public class ClientThreadPoolsTest {
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
Method initialise = ServerLocatorImpl.class.getDeclaredMethod("initialise");
initialise.setAccessible(true);
initialise.invoke(serverLocator);
serverLocator.initialize();
threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true);

View File

@ -390,25 +390,9 @@ public class ActiveMQActivation {
}
}
Thread threadTearDown = new Thread("TearDown/ActiveMQActivation") {
@Override
public void run() {
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.teardown();
}
}
};
// We will first start a new thread that will call tearDown on all the instances, trying to graciously shutdown everything.
// We will then use the call-timeout to determine a timeout.
// if that failed we will then close the connection factory, and interrupt the thread
threadTearDown.start();
try {
threadTearDown.join(timeout);
} catch (InterruptedException e) {
// nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
}
if (factory != null) {
try {
@ -421,20 +405,6 @@ public class ActiveMQActivation {
factory = null;
}
if (threadTearDown.isAlive()) {
threadTearDown.interrupt();
try {
threadTearDown.join(5000);
} catch (InterruptedException e) {
// nothing to be done here.. we are going down anyways
}
if (threadTearDown.isAlive()) {
ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
}
}
nodes.clear();
lastReceived = false;
@ -548,9 +518,7 @@ public class ActiveMQActivation {
calculatedDestinationName = spec.getQueuePrefix() + calculatedDestinationName;
}
logger.debug("Unable to retrieve " + destinationName +
" from JNDI. Creating a new " + destinationType.getName() +
" named " + calculatedDestinationName + " to be used by the MDB.");
logger.debug("Unable to retrieve " + destinationName + " from JNDI. Creating a new " + destinationType.getName() + " named " + calculatedDestinationName + " to be used by the MDB.");
// If there is no binding on naming, we will just create a new instance
if (isTopic) {

View File

@ -189,6 +189,12 @@ public class ClusterController implements ActiveMQComponent {
serverLocator.setInitialConnectAttempts(-1);
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
try {
serverLocator.initialize();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
locators.put(name, serverLocator);
}

View File

@ -64,6 +64,8 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private final NetworkHealthCheck networkHealthCheck;
private boolean stopped = false;
/**
* This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage}
* with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with
@ -266,6 +268,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
* @param explicitSignal the state we want to set the quorum manager to return
*/
public synchronized void causeExit(BACKUP_ACTIVATION explicitSignal) {
stopped = true;
removeListener();
this.signal = explicitSignal;
latch.countDown();
@ -287,7 +290,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
synchronized (voteGuard) {
while (!decision && voteAttempts++ < voteRetries) {
while (!stopped && voteAttempts++ < voteRetries) {
//the live is dead so lets vote for quorum
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);

View File

@ -311,6 +311,7 @@ public abstract class ActiveMQTestBase extends Assert {
for (Exception exception : exceptions) {
exception.printStackTrace(System.out);
}
System.out.println(threadDump("Thread dump with reconnects happening"));
fail("Client Session Factories still trying to reconnect, see above to see where created");
}
Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();

View File

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.connection;
import javax.jms.Connection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Test;
/**
* A CloseConnectionOnGCTest
*/
public class CloseConnectionFactoryOnGCTest extends JMSTestBase {
@Test(timeout = 60000)
public void testCloseCFOnGC() throws Exception {
final AtomicInteger valueGC = new AtomicInteger(0);
ServerLocatorImpl.finalizeCallback = new Runnable() {
@Override
public void run() {
valueGC.incrementAndGet();
}
};
try {
// System.setOut(out);
for (int i = 0; i < 100; i++) {
ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
Connection conn = cf.createConnection();
cf = null;
conn.close();
conn = null;
}
forceGC();
} finally {
ServerLocatorImpl.finalizeCallback = null;
}
assertEquals("The code is throwing exceptions", 0, valueGC.get());
}
}

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Test;
@ -147,11 +146,6 @@ public class ActiveMQMessageHandlerXATest extends ActiveMQRATestBase {
assertTrue(endpoint.interrupted);
assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
Binding binding = server.getPostOffice().getBinding(MDBQUEUEPREFIXEDSIMPLE);
Wait.waitFor(() -> getMessageCount((Queue) binding.getBindable()) == 1);
long messageCount = getMessageCount((Queue) binding.getBindable());
assertEquals(1, messageCount);
}
@Test