This commit is contained in:
Clebert Suconic 2018-08-14 00:02:25 -04:00
commit e549a153a4
5 changed files with 105 additions and 40 deletions

View File

@ -409,6 +409,15 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
return handler; return handler;
} }
@Override
public Thread getCurrentThread() {
if (onMessageThread != null) {
return onMessageThread;
}
return receiverThread;
}
// Must be synchronized since messages may be arriving while handler is being set and might otherwise end // Must be synchronized since messages may be arriving while handler is being set and might otherwise end
// up not queueing enough executors - so messages get stranded // up not queueing enough executors - so messages get stranded
@Override @Override

View File

@ -41,6 +41,8 @@ public interface ClientConsumerInternal extends ClientConsumer {
void clear(boolean waitForOnMessage) throws ActiveMQException; void clear(boolean waitForOnMessage) throws ActiveMQException;
Thread getCurrentThread();
/** /**
* To be called by things like MDBs during shutdown of the server * To be called by things like MDBs during shutdown of the server
* *

View File

@ -26,9 +26,12 @@ import javax.naming.InitialContext;
import javax.resource.ResourceException; import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work; import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager; import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -56,6 +59,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRALogger;
import org.apache.activemq.artemis.ra.ActiveMQRaUtils; import org.apache.activemq.artemis.ra.ActiveMQRaUtils;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil; import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -242,7 +246,7 @@ public class ActiveMQActivation {
logger.trace("start()"); logger.trace("start()");
} }
deliveryActive.set(true); deliveryActive.set(true);
ra.getWorkManager().scheduleWork(new SetupActivation()); scheduleWork(new SetupActivation());
} }
/** /**
@ -282,7 +286,7 @@ public class ActiveMQActivation {
} }
deliveryActive.set(false); deliveryActive.set(false);
teardown(); teardown(true);
} }
/** /**
@ -348,7 +352,7 @@ public class ActiveMQActivation {
/** /**
* Teardown the activation * Teardown the activation
*/ */
protected synchronized void teardown() { protected synchronized void teardown(boolean useInterrupt) {
logger.debug("Tearing down " + spec); logger.debug("Tearing down " + spec);
long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout(); long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
@ -369,28 +373,27 @@ public class ActiveMQActivation {
handlers.clear(); handlers.clear();
FutureLatch future = new FutureLatch(handlersCopy.length); FutureLatch future = new FutureLatch(handlersCopy.length);
List<Thread> interruptThreads = new ArrayList<>();
for (ActiveMQMessageHandler handler : handlersCopy) { for (ActiveMQMessageHandler handler : handlersCopy) {
Thread thread = handler.interruptConsumer(future); handler.interruptConsumer(future);
if (thread != null) {
interruptThreads.add(thread);
}
} }
//wait for all the consumers to complete any onmessage calls //wait for all the consumers to complete any onmessage calls
boolean stuckThreads = !future.await(timeout); boolean stuckThreads = !future.await(timeout);
//if any are stuck then we need to interrupt them //if any are stuck then we need to interrupt them
if (stuckThreads) { if (stuckThreads && useInterrupt) {
for (Thread interruptThread : interruptThreads) { for (ActiveMQMessageHandler handler : handlersCopy) {
try { Thread interruptThread = handler.getCurrentThread();
interruptThread.interrupt(); if (interruptThread != null) {
} catch (Exception e) { try {
//ok interruptThread.interrupt();
} catch (Throwable e) {
//ok
}
} }
} }
} }
Thread threadTearDown = new Thread("TearDown/ActiveMQActivation") { Runnable runTearDown = new Runnable() {
@Override @Override
public void run() { public void run() {
for (ActiveMQMessageHandler handler : handlersCopy) { for (ActiveMQMessageHandler handler : handlersCopy) {
@ -399,10 +402,7 @@ public class ActiveMQActivation {
} }
}; };
// We will first start a new thread that will call tearDown on all the instances, trying to graciously shutdown everything. Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
// We will then use the call-timeout to determine a timeout.
// if that failed we will then close the connection factory, and interrupt the thread
threadTearDown.start();
try { try {
threadTearDown.join(timeout); threadTearDown.join(timeout);
@ -550,9 +550,7 @@ public class ActiveMQActivation {
calculatedDestinationName = spec.getQueuePrefix() + calculatedDestinationName; calculatedDestinationName = spec.getQueuePrefix() + calculatedDestinationName;
} }
logger.debug("Unable to retrieve " + destinationName + logger.debug("Unable to retrieve " + destinationName + " from JNDI. Creating a new " + destinationType.getName() + " named " + calculatedDestinationName + " to be used by the MDB.");
" from JNDI. Creating a new " + destinationType.getName() +
" named " + calculatedDestinationName + " to be used by the MDB.");
// If there is no binding on naming, we will just create a new instance // If there is no binding on naming, we will just create a new instance
if (isTopic) { if (isTopic) {
@ -602,18 +600,41 @@ public class ActiveMQActivation {
return buffer.toString(); return buffer.toString();
} }
public void startReconnectThread(final String threadName) { public void startReconnectThread(final String cause) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Starting reconnect Thread " + threadName + " on MDB activation " + this); logger.trace("Starting reconnect Thread " + cause + " on MDB activation " + this);
} }
Runnable runnable = new Runnable() { try {
@Override // We have to use the worker otherwise we may get the wrong classLoader
public void run() { scheduleWork(new ReconnectWork(cause));
reconnect(null); } catch (Exception e) {
} logger.warn("Could not reconnect because worker is down", e);
}; }
Thread t = new Thread(runnable, threadName); }
private static Thread startThread(String name, Runnable run) {
ClassLoader tccl;
try {
tccl = AccessController.doPrivileged(new PrivilegedExceptionAction<ClassLoader>() {
@Override
public ClassLoader run() {
return ActiveMQActivation.class.getClassLoader();
}
});
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
tccl = null;
}
ActiveMQThreadFactory factory = new ActiveMQThreadFactory(name, true, tccl);
Thread t = factory.newThread(run);
t.start(); t.start();
return t;
}
private void scheduleWork(Work run) throws WorkException {
ra.getWorkManager().scheduleWork(run);
} }
/** /**
@ -621,7 +642,7 @@ public class ActiveMQActivation {
* *
* @param failure if reconnecting in the event of a failure * @param failure if reconnecting in the event of a failure
*/ */
public void reconnect(Throwable failure) { public void reconnect(Throwable failure, boolean useInterrupt) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("reconnecting activation " + this); logger.trace("reconnecting activation " + this);
} }
@ -644,7 +665,7 @@ public class ActiveMQActivation {
try { try {
Throwable lastException = failure; Throwable lastException = failure;
while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) { while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) {
teardown(); teardown(useInterrupt);
try { try {
Thread.sleep(setupInterval); Thread.sleep(setupInterval);
@ -697,7 +718,7 @@ public class ActiveMQActivation {
try { try {
setup(); setup();
} catch (Throwable t) { } catch (Throwable t) {
reconnect(t); reconnect(t, false);
} }
} }
@ -706,6 +727,30 @@ public class ActiveMQActivation {
} }
} }
/**
* Handles reconnecting
*/
private class ReconnectWork implements Work {
final String cause;
ReconnectWork(String cause) {
this.cause = cause;
}
@Override
public void release() {
}
@Override
public void run() {
logger.tracef("Starting reconnect for %s", cause);
reconnect(null, false);
}
}
private class RebalancingListener implements ClusterTopologyListener { private class RebalancingListener implements ClusterTopologyListener {
@Override @Override

View File

@ -124,17 +124,13 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
if (!spec.isShareSubscriptions()) { if (!spec.isShareSubscriptions()) {
throw ActiveMQRALogger.LOGGER.canNotCreatedNonSharedSubscriber(); throw ActiveMQRALogger.LOGGER.canNotCreatedNonSharedSubscriber();
} else if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { } else if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
logger.debug("the mdb on destination " + queueName + " already had " + logger.debug("the mdb on destination " + queueName + " already had " + subResponse.getConsumerCount() + " consumers but the MDB is configured to share subscriptions, so no exceptions are thrown");
subResponse.getConsumerCount() +
" consumers but the MDB is configured to share subscriptions, so no exceptions are thrown");
} }
} }
SimpleString oldFilterString = subResponse.getFilterString(); SimpleString oldFilterString = subResponse.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null || boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
oldFilterString == null && selector != null ||
(oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
SimpleString oldTopicName = subResponse.getAddress(); SimpleString oldTopicName = subResponse.getAddress();
@ -198,6 +194,14 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
return useXA ? session : null; return useXA ? session : null;
} }
public Thread getCurrentThread() {
if (consumer == null) {
return null;
}
return consumer.getCurrentThread();
}
public Thread interruptConsumer(FutureLatch future) { public Thread interruptConsumer(FutureLatch future) {
try { try {
if (consumer != null) { if (consumer != null) {

View File

@ -675,6 +675,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
return null; return null;
} }
@Override
public Thread getCurrentThread() {
return null;
}
@Override @Override
public ClientMessage receive(final long timeout) throws ActiveMQException { public ClientMessage receive(final long timeout) throws ActiveMQException {
return null; return null;