ARTEMIS-2290 JMSBridgeImpl::stop is failing when called from FailureHandler

This commit is contained in:
Francesco Nigro 2019-03-19 12:10:07 +01:00 committed by Justin Bertram
parent affb6a4cc4
commit dc1cfa3536
1 changed files with 293 additions and 177 deletions

View File

@ -45,13 +45,16 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
@ -113,8 +116,8 @@ public final class JMSBridgeImpl implements JMSBridge {
private boolean started; private boolean started;
private final Object stoppingGuard = new Object(); private static final Object stoppingGuard = new Object();
private boolean stopping = false; private volatile boolean stopping = false;
private final LinkedList<Message> messages; private final LinkedList<Message> messages;
@ -142,7 +145,13 @@ public final class JMSBridgeImpl implements JMSBridge {
private MessageProducer targetProducer; private MessageProducer targetProducer;
private BatchTimeChecker timeChecker; private CountDownLatch batchTimeCheckerFinished;
private Future<?> batchTimeCheckerTask;
private CountDownLatch sourceReceiverFinished;
private Future<?> sourceReceiverTask;
private ExecutorService executor; private ExecutorService executor;
@ -418,17 +427,25 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.trace("Starting time checker thread"); ActiveMQJMSBridgeLogger.LOGGER.trace("Starting time checker thread");
} }
timeChecker = new BatchTimeChecker();
executor.execute(timeChecker);
batchExpiryTime = System.currentTimeMillis() + maxBatchTime; batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
batchTimeCheckerFinished = new CountDownLatch(1);
batchTimeCheckerTask = executor.submit(new BatchTimeChecker());
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Started time checker thread"); ActiveMQJMSBridgeLogger.LOGGER.trace("Started time checker thread");
} }
} else {
batchTimeCheckerFinished = null;
batchTimeCheckerTask = null;
} }
executor.execute(new SourceReceiver()); sourceReceiverFinished = new CountDownLatch(1);
sourceReceiverTask = executor.submit(new SourceReceiver());
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Started " + this); ActiveMQJMSBridgeLogger.LOGGER.trace("Started " + this);
@ -451,6 +468,43 @@ public final class JMSBridgeImpl implements JMSBridge {
@Override @Override
public void stop() throws Exception { public void stop() throws Exception {
stop(false);
}
private boolean awaitTaskCompletion(CountDownLatch finished, long time, TimeUnit timeUnit, String taskName) {
boolean taskCompleted;
try {
taskCompleted = finished.await(time, timeUnit);
if (!taskCompleted) {
ActiveMQJMSBridgeLogger.LOGGER.tracef("%s task on bridge %s wasn't able to finish", taskName, bridgeName);
}
return taskCompleted;
} catch (InterruptedException ie) {
ActiveMQJMSBridgeLogger.LOGGER.tracef("An interruption has happened on bridge %s while waiting %s task to finish", bridgeName, taskName);
return false;
}
}
private boolean awaitAll(long time, TimeUnit timeUnit, Pair<String, CountDownLatch>... namedTaskCompletions) {
long remainingNanos = timeUnit.toNanos(time);
boolean allFinished = true;
for (Pair<String, CountDownLatch> namedTaskCompletion : namedTaskCompletions) {
final CountDownLatch taskCompletion = namedTaskCompletion.getB();
if (taskCompletion != null) {
final String taskName = namedTaskCompletion.getA();
final long start = System.nanoTime();
final boolean taskCompleted = awaitTaskCompletion(taskCompletion, remainingNanos, TimeUnit.NANOSECONDS, taskName);
final long elapsed = System.nanoTime() - start;
if (!taskCompleted) {
allFinished = false;
}
remainingNanos = Math.max(0, remainingNanos - elapsed);
}
}
return allFinished;
}
private void stop(boolean isFailureHandler) throws Exception {
synchronized (stoppingGuard) { synchronized (stoppingGuard) {
if (stopping) if (stopping)
return; return;
@ -461,74 +515,119 @@ public final class JMSBridgeImpl implements JMSBridge {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Stopping " + this); ActiveMQJMSBridgeLogger.LOGGER.trace("Stopping " + this);
} }
Connection sourceConn = this.sourceConn;
if (!connectedSource && sourceConn != null) { if (!connectedSource && sourceConn != null) {
sourceConn.close();
}
if (!connectedTarget && targetConn != null) {
targetConn.close();
}
synchronized (lock) {
started = false;
executor.shutdownNow();
}
boolean ok = executor.awaitTermination(60, TimeUnit.SECONDS);
if (!ok) {
throw new Exception("fail to stop JMS Bridge");
}
if (tx != null) {
// Terminate any transaction
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx");
}
stopSessionFailover();
try {
tx.rollback();
abortedMessageCount += messages.size();
} catch (Exception ignore) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore);
}
}
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx");
}
}
if (sourceConn != null) {
try { try {
sourceConn.close(); sourceConn.close();
} catch (Exception ignore) { } catch (Throwable t) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close source connection on bridge %s", t);
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); } finally {
} sourceConn = null;
} }
} }
Connection targetConn = this.targetConn;
if (targetConn != null) { if (!connectedTarget && targetConn != null) {
try { try {
targetConn.close(); targetConn.close();
} catch (Exception ignore) { } catch (Throwable t) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close target connection on bridge %s", t);
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close target conn", ignore); } finally {
targetConn = null;
}
}
final CountDownLatch sourceReceiverFinished = this.sourceReceiverFinished;
final Future<?> sourceReceiverTask = this.sourceReceiverTask;
final CountDownLatch batchTimeCheckerFinished = this.batchTimeCheckerFinished;
final Future<?> batchTimeCheckerTask = this.batchTimeCheckerTask;
this.sourceReceiverFinished = null;
this.sourceReceiverTask = null;
this.batchTimeCheckerFinished = null;
this.batchTimeCheckerTask = null;
synchronized (lock) {
started = false;
if (!isFailureHandler) {
executor.shutdownNow();
} else {
if (sourceReceiverTask != null) {
sourceReceiverTask.cancel(true);
}
if (batchTimeCheckerTask != null) {
batchTimeCheckerTask.cancel(true);
} }
} }
} }
if (messages.size() > 0) { final boolean ok;
// Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge if (!isFailureHandler) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping..."); ok = executor.awaitTermination(60, TimeUnit.SECONDS);
messages.clear(); } else {
ok = awaitAll(60, TimeUnit.SECONDS,
new Pair<>("SourceReceiver", sourceReceiverFinished),
new Pair<>("BatchTimeChecker", batchTimeCheckerFinished));
} }
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { try {
ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
if (!ok) {
throw new Exception("the bridge hasn't cleanly stopped: transactions, connections or messages could have leaked!");
}
if (tx != null) {
// Terminate any transaction
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx");
}
try {
stopSessionFailover();
try {
tx.rollback();
abortedMessageCount += messages.size();
} catch (Exception ignore) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore);
}
}
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx");
}
} catch (Throwable t) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed stopSessionFailover", t);
}
}
if (sourceConn != null) {
try {
sourceConn.close();
} catch (Exception ignore) {
ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close source connection on bridge %s", ignore);
}
}
if (targetConn != null) {
try {
targetConn.close();
} catch (Exception ignore) {
ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close target connection on bridge %s", ignore);
}
}
if (messages.size() > 0) {
// Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge
ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping...");
messages.clear();
}
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
}
} finally {
if (isFailureHandler) {
executor.shutdownNow();
}
} }
} }
} }
@ -1269,13 +1368,16 @@ public final class JMSBridgeImpl implements JMSBridge {
} }
} }
private void pause(final long interval) { /**
long start = System.currentTimeMillis(); * Pause the calling thread for the given {@code millis}: it returns {@code true} if not interrupted, {@code false} otherwise.
while (System.currentTimeMillis() - start < failureRetryInterval) { */
try { private static boolean pause(final long millis) {
Thread.sleep(failureRetryInterval); assert millis >= 0;
} catch (InterruptedException ex) { try {
} Thread.sleep(millis);
return true;
} catch (InterruptedException ex) {
return false;
} }
} }
@ -1286,7 +1388,7 @@ public final class JMSBridgeImpl implements JMSBridge {
int count = 0; int count = 0;
while (true && !stopping) { while (!stopping) {
boolean ok = setupJMSObjects(); boolean ok = setupJMSObjects();
if (ok) { if (ok) {
@ -1301,7 +1403,10 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.failedToSetUpBridge(failureRetryInterval, bridgeName); ActiveMQJMSBridgeLogger.LOGGER.failedToSetUpBridge(failureRetryInterval, bridgeName);
pause(failureRetryInterval); if (!pause(failureRetryInterval)) {
ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while pausing the bridge %s", bridgeName);
return false;
}
} }
// If we get here then we exceeded maxRetries // If we get here then we exceeded maxRetries
@ -1649,87 +1754,88 @@ public final class JMSBridgeImpl implements JMSBridge {
* to ensure that message delivery does not happen concurrently with * to ensure that message delivery does not happen concurrently with
* transaction enlistment of the XAResource (see HORNETQ-27) * transaction enlistment of the XAResource (see HORNETQ-27)
*/ */
private final class SourceReceiver extends Thread { private final class SourceReceiver implements Runnable {
SourceReceiver() {
super("jmsbridge-source-receiver-thread");
}
@Override @Override
@SuppressWarnings("WaitNotInLoop") @SuppressWarnings("WaitNotInLoop")
// both lock.wait(..) either returns, throws or continue, thus avoiding spurious wakes // both lock.wait(..) either returns, throws or continue, thus avoiding spurious wakes
public void run() { public void run() {
while (started) { final CountDownLatch finished = sourceReceiverFinished;
if (stopping) { try {
return; while (started) {
} if (stopping) {
synchronized (lock) { return;
if (paused || failed) { }
try { synchronized (lock) {
lock.wait(500); if (paused || failed) {
} catch (InterruptedException e) { try {
if (stopping) { lock.wait(500);
return; } catch (InterruptedException e) {
if (stopping) {
return;
}
throw new ActiveMQInterruptedException(e);
} }
throw new ActiveMQInterruptedException(e); continue;
} }
continue;
}
Message msg = null; Message msg = null;
try {
msg = sourceConsumer.receive(1000);
if (msg instanceof ActiveMQMessage) {
// We need to check the buffer mainly in the case of LargeMessages
// As we need to reconstruct the buffer before resending the message
((ActiveMQMessage) msg).checkBuffer();
}
} catch (JMSException jmse) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " exception while receiving a message", jmse);
}
}
if (msg == null) {
try { try {
lock.wait(500); msg = sourceConsumer.receive(1000);
} catch (InterruptedException e) {
if (msg instanceof ActiveMQMessage) {
// We need to check the buffer mainly in the case of LargeMessages
// As we need to reconstruct the buffer before resending the message
((ActiveMQMessage) msg).checkBuffer();
}
} catch (JMSException jmse) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); ActiveMQJMSBridgeLogger.LOGGER.trace(this + " exception while receiving a message", jmse);
} }
if (stopping) {
return;
}
throw new ActiveMQInterruptedException(e);
}
continue;
}
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received message " + msg);
}
messages.add(msg);
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime);
}
if (maxBatchSize != -1 && messages.size() >= maxBatchSize) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch");
} }
sendBatch(); if (msg == null) {
try {
lock.wait(500);
} catch (InterruptedException e) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted");
}
if (stopping) {
return;
}
throw new ActiveMQInterruptedException(e);
}
continue;
}
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received message " + msg);
}
messages.add(msg);
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime);
}
if (maxBatchSize != -1 && messages.size() >= maxBatchSize) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch");
}
sendBatch();
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch");
}
} }
} }
} }
} finally {
finished.countDown();
} }
} }
} }
@ -1764,8 +1870,9 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.errorConnectingBridge(bridgeName); ActiveMQJMSBridgeLogger.LOGGER.errorConnectingBridge(bridgeName);
try { try {
stop(); stop(true);
} catch (Exception ignore) { } catch (Throwable ignore) {
ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to stop bridge %s from %s ", bridgeName, this.getClass().getSimpleName(), ignore);
} }
} }
@ -1785,7 +1892,11 @@ public final class JMSBridgeImpl implements JMSBridge {
if (maxRetries > 0 || maxRetries == -1) { if (maxRetries > 0 || maxRetries == -1) {
ActiveMQJMSBridgeLogger.LOGGER.bridgeRetry(failureRetryInterval, bridgeName); ActiveMQJMSBridgeLogger.LOGGER.bridgeRetry(failureRetryInterval, bridgeName);
pause(failureRetryInterval); if (!pause(failureRetryInterval)) {
ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while pausing the bridge %s", bridgeName);
failed();
return;
}
// Now we try // Now we try
ok = setupJMSObjectsWithRetry(); ok = setupJMSObjectsWithRetry();
@ -1839,53 +1950,58 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " running"); ActiveMQJMSBridgeLogger.LOGGER.trace(this + " running");
} }
synchronized (lock) { final CountDownLatch completed = batchTimeCheckerFinished;
while (started) { try {
long toWait = batchExpiryTime - System.currentTimeMillis(); synchronized (lock) {
while (started) {
long toWait = batchExpiryTime - System.currentTimeMillis();
if (toWait <= 0) { if (toWait <= 0) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited enough"); ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited enough");
} }
synchronized (lock) { synchronized (lock) {
if (!failed && !messages.isEmpty()) { if (!failed && !messages.isEmpty()) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " got some messages so sending batch"); ActiveMQJMSBridgeLogger.LOGGER.trace(this + " got some messages so sending batch");
} }
sendBatch(); sendBatch();
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch");
}
} }
} }
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
} else {
try {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waiting for " + toWait);
}
lock.wait(toWait);
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke up");
}
} catch (InterruptedException e) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted");
}
if (stopping) {
return;
}
throw new ActiveMQInterruptedException(e);
}
} }
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
} else {
try {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waiting for " + toWait);
}
lock.wait(toWait);
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke up");
}
} catch (InterruptedException e) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted");
}
if (stopping) {
return;
}
throw new ActiveMQInterruptedException(e);
}
} }
} }
} finally {
completed.countDown();
} }
} }
} }
@ -2037,8 +2153,8 @@ public final class JMSBridgeImpl implements JMSBridge {
} }
/* /*
* make sure we reset the connected flags * make sure we reset the connected flags
* */ * */
if (result == FailoverEventType.FAILOVER_COMPLETED) { if (result == FailoverEventType.FAILOVER_COMPLETED) {
if (isSource) { if (isSource) {
connectedSource = true; connectedSource = true;