This closes #117 ARTEMIS-179 on reconnection issues

Each commit part of this merge was left here distinct so there would be a record of why each specific change was made since they had potentially far-reaching consequences.
This commit is contained in:
Clebert Suconic 2015-08-11 11:21:56 -04:00
commit 126be4ce24
6 changed files with 44 additions and 37 deletions

View File

@ -579,7 +579,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
sessionsToClose = new HashSet<ClientSessionInternal>(sessions); sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
} }
callFailoverListeners(FailoverEventType.FAILOVER_FAILED); callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
callSessionFailureListeners(me, true, false); callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);
} }
} }
finally { finally {

View File

@ -565,7 +565,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
catch (Throwable dontCare) { catch (Throwable dontCare) {
} }
if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID)) { if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) {
synchronized (this) { synchronized (this) {
try { try {
ActiveMQServerLogger.LOGGER.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID); ActiveMQServerLogger.LOGGER.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
@ -587,8 +587,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
else { else {
ActiveMQServerLogger.LOGGER.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID); ActiveMQServerLogger.LOGGER.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID);
//we never fail permanently here, this only happens once all reconnect tries have happened fail(me.getType() == ActiveMQExceptionType.DISCONNECTED);
fail(false);
} }
tryScheduleRetryReconnect(me.getType()); tryScheduleRetryReconnect(me.getType());

View File

@ -340,7 +340,9 @@ public class ClusterConnectionBridge extends BridgeImpl {
@Override @Override
protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) { protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) {
scheduleRetryConnect(); if (type != ActiveMQExceptionType.DISCONNECTED) {
scheduleRetryConnect();
}
} }
@Override @Override

View File

@ -1428,7 +1428,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
ActiveMQServerLogger.LOGGER.debug("Removing record for: " + targetNodeID); ActiveMQServerLogger.LOGGER.debug("Removing record for: " + targetNodeID);
MessageFlowRecord record = records.remove(targetNodeID); MessageFlowRecord record = records.remove(targetNodeID);
try { try {
record.close(); if (record != null) {
record.close();
}
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -56,9 +56,9 @@ public class ClusteredGroupingTest extends ClusterTestBase {
setupServer(0, isFileStorage(), isNetty()); setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1); setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0); setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@ -118,11 +118,11 @@ public class ClusteredGroupingTest extends ClusterTestBase {
setupServer(1, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty());
setupServer(2, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@ -190,11 +190,11 @@ public class ClusteredGroupingTest extends ClusterTestBase {
setupServer(1, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty());
setupServer(2, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2);
setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@ -263,13 +263,13 @@ public class ClusteredGroupingTest extends ClusterTestBase {
setupServer(2, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty());
setupServer(3, isFileStorage(), isNetty()); setupServer(3, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2, 3); setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2, 3);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2, 3); setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2, 3);
setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1, 3); setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1, 3);
setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 3, 1, 2, 3); setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 3, 1, 2, 3);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);

View File

@ -402,28 +402,32 @@ public class BindingsClusterTest extends JMSClusteredTestBase {
} }
private void crash() throws Exception { private void crash() throws Exception {
/*
* Rather than just calling stop() on the server here we want to simulate an actual node crash or bridge failure
* so the bridge's failure listener needs to get something other than a DISCONNECTED message. In this case we
* simulate a NOT_CONNECTED exception.
*/
final CountDownLatch latch = new CountDownLatch(1);
ClusterConnectionImpl next = (ClusterConnectionImpl) server1.getClusterManager().getClusterConnections().iterator().next();
BridgeImpl bridge = (BridgeImpl) next.getRecords().values().iterator().next().getBridge();
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.addFailureListener(new FailureListener() {
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
latch.countDown();
}
@Override
public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
connectionFailed(me, failedOver);
}
});
forwardingConnection.fail(new ActiveMQNotConnectedException());
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
if (crash) { if (crash) {
jmsServer2.stop(); jmsServer2.stop();
} }
else {
final CountDownLatch latch = new CountDownLatch(1);
ClusterConnectionImpl next = (ClusterConnectionImpl) server1.getClusterManager().getClusterConnections().iterator().next();
BridgeImpl bridge = (BridgeImpl) next.getRecords().values().iterator().next().getBridge();
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.addFailureListener(new FailureListener() {
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
latch.countDown();
}
@Override
public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
connectionFailed(me, failedOver);
}
});
forwardingConnection.fail(new ActiveMQNotConnectedException());
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
} }
private void restart() throws Exception { private void restart() throws Exception {