[ARTEMIS-2105] Discovery group connectors can delay broker shutdown

Issue: https://issues.apache.org/jira/browse/ARTEMIS-2105
This commit is contained in:
Ingo Weiss 2018-10-02 15:43:52 +01:00 committed by Clebert Suconic
parent d441e7595d
commit 2450f6a376
1 changed files with 73 additions and 68 deletions

View File

@ -123,6 +123,8 @@ public class ActiveMQActivation {
private boolean lastReceived = false;
private final Object teardownLock = new Object();
// Whether we are in the failure recovery loop
private final AtomicBoolean inReconnect = new AtomicBoolean(false);
private XARecoveryConfig resourceRecovery;
@ -352,98 +354,102 @@ public class ActiveMQActivation {
/**
* Teardown the activation
*/
protected synchronized void teardown(boolean useInterrupt) {
logger.debug("Tearing down " + spec);
protected void teardown(boolean useInterrupt) {
long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
synchronized (teardownLock) {
if (resourceRecovery != null) {
ra.getRecoveryManager().unRegister(resourceRecovery);
}
logger.debug("Tearing down " + spec);
final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];
long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
// We need to do from last to first as any temporary queue will have been created on the first handler
// So we invert the handlers here
for (int i = 0; i < handlers.size(); i++) {
// The index here is the complimentary so it's inverting the array
handlersCopy[i] = handlers.get(handlers.size() - i - 1);
}
if (resourceRecovery != null) {
ra.getRecoveryManager().unRegister(resourceRecovery);
}
handlers.clear();
final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];
FutureLatch future = new FutureLatch(handlersCopy.length);
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.interruptConsumer(future);
}
// We need to do from last to first as any temporary queue will have been created on the first handler
// So we invert the handlers here
for (int i = 0; i < handlers.size(); i++) {
// The index here is the complimentary so it's inverting the array
handlersCopy[i] = handlers.get(handlers.size() - i - 1);
}
//wait for all the consumers to complete any onmessage calls
boolean stuckThreads = !future.await(timeout);
//if any are stuck then we need to interrupt them
if (stuckThreads && useInterrupt) {
handlers.clear();
FutureLatch future = new FutureLatch(handlersCopy.length);
for (ActiveMQMessageHandler handler : handlersCopy) {
Thread interruptThread = handler.getCurrentThread();
if (interruptThread != null) {
try {
logger.tracef("Interrupting thread %s", interruptThread.getName());
} catch (Throwable justLog) {
logger.warn(justLog);
}
try {
interruptThread.interrupt();
} catch (Throwable e) {
//ok
}
}
handler.interruptConsumer(future);
}
}
Runnable runTearDown = new Runnable() {
@Override
public void run() {
//wait for all the consumers to complete any onmessage calls
boolean stuckThreads = !future.await(timeout);
//if any are stuck then we need to interrupt them
if (stuckThreads && useInterrupt) {
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.teardown();
Thread interruptThread = handler.getCurrentThread();
if (interruptThread != null) {
try {
logger.tracef("Interrupting thread %s", interruptThread.getName());
} catch (Throwable justLog) {
logger.warn(justLog);
}
try {
interruptThread.interrupt();
} catch (Throwable e) {
//ok
}
}
}
}
};
Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
Runnable runTearDown = new Runnable() {
@Override
public void run() {
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.teardown();
}
}
};
try {
threadTearDown.join(timeout);
} catch (InterruptedException e) {
// nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
}
Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
if (factory != null) {
try {
// closing the factory will help making sure pending threads are closed
factory.close();
} catch (Throwable e) {
ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
threadTearDown.join(timeout);
} catch (InterruptedException e) {
// nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
}
factory = null;
}
if (factory != null) {
try {
// closing the factory will help making sure pending threads are closed
factory.close();
} catch (Throwable e) {
ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
}
if (threadTearDown.isAlive()) {
threadTearDown.interrupt();
try {
threadTearDown.join(5000);
} catch (InterruptedException e) {
// nothing to be done here.. we are going down anyways
factory = null;
}
if (threadTearDown.isAlive()) {
ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
threadTearDown.interrupt();
try {
threadTearDown.join(5000);
} catch (InterruptedException e) {
// nothing to be done here.. we are going down anyways
}
if (threadTearDown.isAlive()) {
ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
}
}
nodes.clear();
lastReceived = false;
logger.debug("Tearing down complete " + this);
}
nodes.clear();
lastReceived = false;
logger.debug("Tearing down complete " + this);
}
protected void setupCF() throws Exception {
@ -467,7 +473,6 @@ public class ActiveMQActivation {
} else {
factory = ra.newConnectionFactory(spec);
}
}
/**