diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java index a5e1c0e724..2da550b951 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java @@ -45,13 +45,16 @@ import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.ServiceLoader; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; @@ -113,8 +116,8 @@ public final class JMSBridgeImpl implements JMSBridge { private boolean started; - private final Object stoppingGuard = new Object(); - private boolean stopping = false; + private static final Object stoppingGuard = new Object(); + private volatile boolean stopping = false; private final LinkedList messages; @@ -142,7 +145,13 @@ public final class JMSBridgeImpl implements JMSBridge { private MessageProducer targetProducer; - private BatchTimeChecker timeChecker; + private CountDownLatch batchTimeCheckerFinished; + + private Future batchTimeCheckerTask; + + private CountDownLatch sourceReceiverFinished; + + private Future sourceReceiverTask; private ExecutorService executor; @@ -418,17 +427,25 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.trace("Starting time checker thread"); } - timeChecker = new BatchTimeChecker(); - - executor.execute(timeChecker); batchExpiryTime = System.currentTimeMillis() + maxBatchTime; + batchTimeCheckerFinished = new CountDownLatch(1); + + batchTimeCheckerTask = executor.submit(new BatchTimeChecker()); + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Started time checker thread"); } + } else { + + batchTimeCheckerFinished = null; + + batchTimeCheckerTask = null; } - executor.execute(new SourceReceiver()); + sourceReceiverFinished = new CountDownLatch(1); + + sourceReceiverTask = executor.submit(new SourceReceiver()); if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Started " + this); @@ -451,6 +468,43 @@ public final class JMSBridgeImpl implements JMSBridge { @Override public void stop() throws Exception { + stop(false); + } + + private boolean awaitTaskCompletion(CountDownLatch finished, long time, TimeUnit timeUnit, String taskName) { + boolean taskCompleted; + try { + taskCompleted = finished.await(time, timeUnit); + if (!taskCompleted) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("%s task on bridge %s wasn't able to finish", taskName, bridgeName); + } + return taskCompleted; + } catch (InterruptedException ie) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("An interruption has happened on bridge %s while waiting %s task to finish", bridgeName, taskName); + return false; + } + } + + private boolean awaitAll(long time, TimeUnit timeUnit, Pair... namedTaskCompletions) { + long remainingNanos = timeUnit.toNanos(time); + boolean allFinished = true; + for (Pair namedTaskCompletion : namedTaskCompletions) { + final CountDownLatch taskCompletion = namedTaskCompletion.getB(); + if (taskCompletion != null) { + final String taskName = namedTaskCompletion.getA(); + final long start = System.nanoTime(); + final boolean taskCompleted = awaitTaskCompletion(taskCompletion, remainingNanos, TimeUnit.NANOSECONDS, taskName); + final long elapsed = System.nanoTime() - start; + if (!taskCompleted) { + allFinished = false; + } + remainingNanos = Math.max(0, remainingNanos - elapsed); + } + } + return allFinished; + } + + private void stop(boolean isFailureHandler) throws Exception { synchronized (stoppingGuard) { if (stopping) return; @@ -461,74 +515,119 @@ public final class JMSBridgeImpl implements JMSBridge { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Stopping " + this); } + Connection sourceConn = this.sourceConn; if (!connectedSource && sourceConn != null) { - sourceConn.close(); - } - if (!connectedTarget && targetConn != null) { - targetConn.close(); - } - synchronized (lock) { - started = false; - - executor.shutdownNow(); - } - - boolean ok = executor.awaitTermination(60, TimeUnit.SECONDS); - - if (!ok) { - throw new Exception("fail to stop JMS Bridge"); - } - - if (tx != null) { - // Terminate any transaction - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); - } - - stopSessionFailover(); - - try { - tx.rollback(); - abortedMessageCount += messages.size(); - } catch (Exception ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore); - } - } - - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx"); - } - } - - if (sourceConn != null) { try { sourceConn.close(); - } catch (Exception ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); - } + } catch (Throwable t) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close source connection on bridge %s", t); + } finally { + sourceConn = null; } } - - if (targetConn != null) { + Connection targetConn = this.targetConn; + if (!connectedTarget && targetConn != null) { try { targetConn.close(); - } catch (Exception ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close target conn", ignore); + } catch (Throwable t) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close target connection on bridge %s", t); + } finally { + targetConn = null; + } + } + final CountDownLatch sourceReceiverFinished = this.sourceReceiverFinished; + final Future sourceReceiverTask = this.sourceReceiverTask; + final CountDownLatch batchTimeCheckerFinished = this.batchTimeCheckerFinished; + final Future batchTimeCheckerTask = this.batchTimeCheckerTask; + this.sourceReceiverFinished = null; + this.sourceReceiverTask = null; + this.batchTimeCheckerFinished = null; + this.batchTimeCheckerTask = null; + synchronized (lock) { + started = false; + if (!isFailureHandler) { + executor.shutdownNow(); + } else { + if (sourceReceiverTask != null) { + sourceReceiverTask.cancel(true); + } + if (batchTimeCheckerTask != null) { + batchTimeCheckerTask.cancel(true); } } } - if (messages.size() > 0) { - // Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge - ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping..."); - messages.clear(); + final boolean ok; + if (!isFailureHandler) { + ok = executor.awaitTermination(60, TimeUnit.SECONDS); + } else { + ok = awaitAll(60, TimeUnit.SECONDS, + new Pair<>("SourceReceiver", sourceReceiverFinished), + new Pair<>("BatchTimeChecker", batchTimeCheckerFinished)); } - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this); + try { + + if (!ok) { + throw new Exception("the bridge hasn't cleanly stopped: transactions, connections or messages could have leaked!"); + } + + if (tx != null) { + // Terminate any transaction + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); + } + + try { + stopSessionFailover(); + + try { + tx.rollback(); + abortedMessageCount += messages.size(); + } catch (Exception ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore); + } + } + + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx"); + } + } catch (Throwable t) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed stopSessionFailover", t); + } + + } + + if (sourceConn != null) { + try { + sourceConn.close(); + } catch (Exception ignore) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close source connection on bridge %s", ignore); + } + } + + if (targetConn != null) { + try { + targetConn.close(); + } catch (Exception ignore) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close target connection on bridge %s", ignore); + } + } + + if (messages.size() > 0) { + // Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge + ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping..."); + messages.clear(); + } + + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this); + } + } finally { + if (isFailureHandler) { + executor.shutdownNow(); + } } } } @@ -1269,13 +1368,16 @@ public final class JMSBridgeImpl implements JMSBridge { } } - private void pause(final long interval) { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < failureRetryInterval) { - try { - Thread.sleep(failureRetryInterval); - } catch (InterruptedException ex) { - } + /** + * Pause the calling thread for the given {@code millis}: it returns {@code true} if not interrupted, {@code false} otherwise. + */ + private static boolean pause(final long millis) { + assert millis >= 0; + try { + Thread.sleep(millis); + return true; + } catch (InterruptedException ex) { + return false; } } @@ -1286,7 +1388,7 @@ public final class JMSBridgeImpl implements JMSBridge { int count = 0; - while (true && !stopping) { + while (!stopping) { boolean ok = setupJMSObjects(); if (ok) { @@ -1301,7 +1403,10 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.failedToSetUpBridge(failureRetryInterval, bridgeName); - pause(failureRetryInterval); + if (!pause(failureRetryInterval)) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while pausing the bridge %s", bridgeName); + return false; + } } // If we get here then we exceeded maxRetries @@ -1649,87 +1754,88 @@ public final class JMSBridgeImpl implements JMSBridge { * to ensure that message delivery does not happen concurrently with * transaction enlistment of the XAResource (see HORNETQ-27) */ - private final class SourceReceiver extends Thread { - - SourceReceiver() { - super("jmsbridge-source-receiver-thread"); - } + private final class SourceReceiver implements Runnable { @Override @SuppressWarnings("WaitNotInLoop") // both lock.wait(..) either returns, throws or continue, thus avoiding spurious wakes public void run() { - while (started) { - if (stopping) { - return; - } - synchronized (lock) { - if (paused || failed) { - try { - lock.wait(500); - } catch (InterruptedException e) { - if (stopping) { - return; + final CountDownLatch finished = sourceReceiverFinished; + try { + while (started) { + if (stopping) { + return; + } + synchronized (lock) { + if (paused || failed) { + try { + lock.wait(500); + } catch (InterruptedException e) { + if (stopping) { + return; + } + throw new ActiveMQInterruptedException(e); } - throw new ActiveMQInterruptedException(e); + continue; } - continue; - } - Message msg = null; - try { - msg = sourceConsumer.receive(1000); - - if (msg instanceof ActiveMQMessage) { - // We need to check the buffer mainly in the case of LargeMessages - // As we need to reconstruct the buffer before resending the message - ((ActiveMQMessage) msg).checkBuffer(); - } - } catch (JMSException jmse) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " exception while receiving a message", jmse); - } - } - - if (msg == null) { + Message msg = null; try { - lock.wait(500); - } catch (InterruptedException e) { + msg = sourceConsumer.receive(1000); + + if (msg instanceof ActiveMQMessage) { + // We need to check the buffer mainly in the case of LargeMessages + // As we need to reconstruct the buffer before resending the message + ((ActiveMQMessage) msg).checkBuffer(); + } + } catch (JMSException jmse) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " exception while receiving a message", jmse); } - if (stopping) { - return; - } - throw new ActiveMQInterruptedException(e); - } - continue; - } - - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received message " + msg); - } - - messages.add(msg); - - batchExpiryTime = System.currentTimeMillis() + maxBatchTime; - - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); - } - - if (maxBatchSize != -1 && messages.size() >= maxBatchSize) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch"); } - sendBatch(); + if (msg == null) { + try { + lock.wait(500); + } catch (InterruptedException e) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + } + if (stopping) { + return; + } + throw new ActiveMQInterruptedException(e); + } + continue; + } if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received message " + msg); + } + + messages.add(msg); + + batchExpiryTime = System.currentTimeMillis() + maxBatchTime; + + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); + } + + if (maxBatchSize != -1 && messages.size() >= maxBatchSize) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch"); + } + + sendBatch(); + + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + } } } } + } finally { + finished.countDown(); } } } @@ -1764,8 +1870,9 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.errorConnectingBridge(bridgeName); try { - stop(); - } catch (Exception ignore) { + stop(true); + } catch (Throwable ignore) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to stop bridge %s from %s ", bridgeName, this.getClass().getSimpleName(), ignore); } } @@ -1785,7 +1892,11 @@ public final class JMSBridgeImpl implements JMSBridge { if (maxRetries > 0 || maxRetries == -1) { ActiveMQJMSBridgeLogger.LOGGER.bridgeRetry(failureRetryInterval, bridgeName); - pause(failureRetryInterval); + if (!pause(failureRetryInterval)) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while pausing the bridge %s", bridgeName); + failed(); + return; + } // Now we try ok = setupJMSObjectsWithRetry(); @@ -1839,53 +1950,58 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.trace(this + " running"); } - synchronized (lock) { - while (started) { - long toWait = batchExpiryTime - System.currentTimeMillis(); + final CountDownLatch completed = batchTimeCheckerFinished; + try { + synchronized (lock) { + while (started) { + long toWait = batchExpiryTime - System.currentTimeMillis(); - if (toWait <= 0) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited enough"); - } + if (toWait <= 0) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited enough"); + } - synchronized (lock) { - if (!failed && !messages.isEmpty()) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " got some messages so sending batch"); - } + synchronized (lock) { + if (!failed && !messages.isEmpty()) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " got some messages so sending batch"); + } - sendBatch(); + sendBatch(); - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + } } } + + batchExpiryTime = System.currentTimeMillis() + maxBatchTime; + } else { + try { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waiting for " + toWait); + } + + lock.wait(toWait); + + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke up"); + } + } catch (InterruptedException e) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + } + if (stopping) { + return; + } + throw new ActiveMQInterruptedException(e); + } + } - - batchExpiryTime = System.currentTimeMillis() + maxBatchTime; - } else { - try { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waiting for " + toWait); - } - - lock.wait(toWait); - - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke up"); - } - } catch (InterruptedException e) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); - } - if (stopping) { - return; - } - throw new ActiveMQInterruptedException(e); - } - } } + } finally { + completed.countDown(); } } } @@ -2037,8 +2153,8 @@ public final class JMSBridgeImpl implements JMSBridge { } /* - * make sure we reset the connected flags - * */ + * make sure we reset the connected flags + * */ if (result == FailoverEventType.FAILOVER_COMPLETED) { if (isSource) { connectedSource = true;