mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-2632 - dispose the old transport listener before rebalancing
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1079384 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8fba621a08
commit
6c09e846b1
|
@ -205,6 +205,11 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final void disposeTransport(Transport transport) {
|
||||||
|
transport.setTransportListener(disposedListener);
|
||||||
|
ServiceSupport.dispose(transport);
|
||||||
|
}
|
||||||
|
|
||||||
public final void handleTransportFailure(IOException e) throws InterruptedException {
|
public final void handleTransportFailure(IOException e) throws InterruptedException {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(this + " handleTransportFailure: " + e);
|
LOG.trace(this + " handleTransportFailure: " + e);
|
||||||
|
@ -218,8 +223,7 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
}
|
}
|
||||||
if (transport != null) {
|
if (transport != null) {
|
||||||
|
|
||||||
transport.setTransportListener(disposedListener);
|
disposeTransport(transport);
|
||||||
ServiceSupport.dispose(transport);
|
|
||||||
|
|
||||||
boolean reconnectOk = false;
|
boolean reconnectOk = false;
|
||||||
synchronized (reconnectMutex) {
|
synchronized (reconnectMutex) {
|
||||||
|
@ -808,7 +812,7 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
try {
|
try {
|
||||||
Transport transport = this.connectedTransport.getAndSet(null);
|
Transport transport = this.connectedTransport.getAndSet(null);
|
||||||
if (transport != null) {
|
if (transport != null) {
|
||||||
transport.stop();
|
disposeTransport(transport);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.debug("Caught an exception stopping existing transport for rebalance", e);
|
LOG.debug("Caught an exception stopping existing transport for rebalance", e);
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class FailoverClusterTest extends TestCase {
|
||||||
if (brokerB == null) {
|
if (brokerB == null) {
|
||||||
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
|
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
|
||||||
}
|
}
|
||||||
Thread.sleep(5000);
|
Thread.sleep(3000);
|
||||||
Set<String> set = new HashSet<String>();
|
Set<String> set = new HashSet<String>();
|
||||||
for (ActiveMQConnection c : connections) {
|
for (ActiveMQConnection c : connections) {
|
||||||
set.add(c.getTransportChannel().getRemoteAddress());
|
set.add(c.getTransportChannel().getRemoteAddress());
|
||||||
|
@ -66,7 +66,7 @@ public class FailoverClusterTest extends TestCase {
|
||||||
// add in server side only url param, should not be propagated
|
// add in server side only url param, should not be propagated
|
||||||
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS + "?transport.closeAsync=false");
|
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS + "?transport.closeAsync=false");
|
||||||
}
|
}
|
||||||
Thread.sleep(5000);
|
Thread.sleep(3000);
|
||||||
Set<String> set = new HashSet<String>();
|
Set<String> set = new HashSet<String>();
|
||||||
for (ActiveMQConnection c : connections) {
|
for (ActiveMQConnection c : connections) {
|
||||||
set.add(c.getTransportChannel().getRemoteAddress());
|
set.add(c.getTransportChannel().getRemoteAddress());
|
||||||
|
|
Loading…
Reference in New Issue