mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@924350 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
be55d0a661
commit
de04214bb5
|
@ -186,7 +186,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
private DestinationSource destinationSource;
|
||||
private final Object ensureConnectionInfoSentMutex = new Object();
|
||||
private boolean useDedicatedTaskRunner;
|
||||
protected CountDownLatch transportInterruptionProcessingComplete;
|
||||
protected volatile CountDownLatch transportInterruptionProcessingComplete;
|
||||
private long consumerFailoverRedeliveryWaitPeriod;
|
||||
|
||||
/**
|
||||
|
@ -1840,7 +1840,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
}
|
||||
|
||||
public void transportInterupted() {
|
||||
transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
|
||||
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
|
||||
}
|
||||
|
@ -2245,17 +2245,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
}
|
||||
|
||||
protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
|
||||
if (transportInterruptionProcessingComplete != null) {
|
||||
while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(10, TimeUnit.SECONDS)) {
|
||||
LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
|
||||
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
|
||||
if (cdl != null) {
|
||||
if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
|
||||
LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
|
||||
cdl.await(10, TimeUnit.SECONDS);
|
||||
}
|
||||
signalInterruptionProcessingComplete();
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized void transportInterruptionProcessingComplete() {
|
||||
if (transportInterruptionProcessingComplete != null) {
|
||||
transportInterruptionProcessingComplete.countDown();
|
||||
protected void transportInterruptionProcessingComplete() {
|
||||
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
|
||||
if (cdl != null) {
|
||||
cdl.countDown();
|
||||
try {
|
||||
signalInterruptionProcessingComplete();
|
||||
} catch (InterruptedException ignored) {}
|
||||
|
@ -2263,20 +2266,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
|||
}
|
||||
|
||||
private void signalInterruptionProcessingComplete() throws InterruptedException {
|
||||
if (transportInterruptionProcessingComplete.await(0, TimeUnit.SECONDS)) {
|
||||
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
|
||||
if (cdl.getCount()==0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
|
||||
}
|
||||
synchronized (this) {
|
||||
transportInterruptionProcessingComplete = null;
|
||||
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
|
||||
if (failoverTransport != null) {
|
||||
failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("notified failover transport (" + failoverTransport +") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
|
||||
}
|
||||
}
|
||||
this.transportInterruptionProcessingComplete = null;
|
||||
|
||||
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
|
||||
if (failoverTransport != null) {
|
||||
failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("notified failover transport (" + failoverTransport
|
||||
+ ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue