diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index aa27581e87..acdf1e8e35 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -579,7 +579,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C sessionsToClose = new HashSet(sessions); } callFailoverListeners(FailoverEventType.FAILOVER_FAILED); - callSessionFailureListeners(me, true, false); + callSessionFailureListeners(me, true, false, scaleDownTargetNodeID); } } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 9853455b36..eb99fcc99d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -565,7 +565,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled catch (Throwable dontCare) { } - if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID)) { + if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) { synchronized (this) { try { ActiveMQServerLogger.LOGGER.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID); @@ -587,8 +587,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled else { ActiveMQServerLogger.LOGGER.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID); - //we never fail permanently here, this only happens once all reconnect tries have happened - fail(false); + fail(me.getType() == ActiveMQExceptionType.DISCONNECTED); } tryScheduleRetryReconnect(me.getType()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index c7e70c084d..e336b3300d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -340,7 +340,9 @@ public class ClusterConnectionBridge extends BridgeImpl { @Override protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) { - scheduleRetryConnect(); + if (type != ActiveMQExceptionType.DISCONNECTED) { + scheduleRetryConnect(); + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index c4666b9f58..e42f46b735 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -1428,7 +1428,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn ActiveMQServerLogger.LOGGER.debug("Removing record for: " + targetNodeID); MessageFlowRecord record = records.remove(targetNodeID); try { - record.close(); + if (record != null) { + record.close(); + } } catch (Exception e) { e.printStackTrace(); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java index a623a6d636..9f4cf7eb41 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java @@ -56,9 +56,9 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(0, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -118,11 +118,11 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -190,11 +190,11 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -263,13 +263,13 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(2, isFileStorage(), isNetty()); setupServer(3, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2, 3); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2, 3); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2, 3); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2, 3); - setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1, 3); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1, 3); - setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 3, 1, 2, 3); + setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 3, 1, 2, 3); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java index 29c0edc7fa..674c9f15d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java @@ -402,28 +402,32 @@ public class BindingsClusterTest extends JMSClusteredTestBase { } private void crash() throws Exception { + /* + * Rather than just calling stop() on the server here we want to simulate an actual node crash or bridge failure + * so the bridge's failure listener needs to get something other than a DISCONNECTED message. In this case we + * simulate a NOT_CONNECTED exception. + */ + final CountDownLatch latch = new CountDownLatch(1); + ClusterConnectionImpl next = (ClusterConnectionImpl) server1.getClusterManager().getClusterConnections().iterator().next(); + BridgeImpl bridge = (BridgeImpl) next.getRecords().values().iterator().next().getBridge(); + RemotingConnection forwardingConnection = getForwardingConnection(bridge); + forwardingConnection.addFailureListener(new FailureListener() { + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + latch.countDown(); + } + + @Override + public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { + connectionFailed(me, failedOver); + } + }); + forwardingConnection.fail(new ActiveMQNotConnectedException()); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + if (crash) { jmsServer2.stop(); } - else { - final CountDownLatch latch = new CountDownLatch(1); - ClusterConnectionImpl next = (ClusterConnectionImpl) server1.getClusterManager().getClusterConnections().iterator().next(); - BridgeImpl bridge = (BridgeImpl) next.getRecords().values().iterator().next().getBridge(); - RemotingConnection forwardingConnection = getForwardingConnection(bridge); - forwardingConnection.addFailureListener(new FailureListener() { - @Override - public void connectionFailed(ActiveMQException exception, boolean failedOver) { - latch.countDown(); - } - - @Override - public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { - connectionFailed(me, failedOver); - } - }); - forwardingConnection.fail(new ActiveMQNotConnectedException()); - assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - } } private void restart() throws Exception {