From f3be78a09633837d59ff1a79fa681b684bcd416c Mon Sep 17 00:00:00 2001 From: jbertram Date: Wed, 5 Aug 2015 12:17:21 -0500 Subject: [PATCH 1/4] ARTEMIS-179 Expose disconnect/reconnect problem Currently a cluster bridge will continue to attempt to reconnect to a node that sends it a DISCONNECT until its reconnect-attempts is exhausted. A DISCONNECT message indicates that the node is not coming back so no reconnect attempt should be made and the bridge should be stopped, the bindings should be cleaned up, etc. The change to this test exposes this problem. --- .../extras/byteman/ClusteredGroupingTest.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java index a623a6d636..9f4cf7eb41 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java @@ -56,9 +56,9 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(0, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -118,11 +118,11 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -190,11 +190,11 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -263,13 +263,13 @@ public class ClusteredGroupingTest extends ClusterTestBase { setupServer(2, isFileStorage(), isNetty()); setupServer(3, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2, 3); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 0, 1, 2, 3); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2, 3); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 1, 0, 2, 3); - setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1, 3); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 2, 0, 1, 3); - setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 3, 1, 2, 3); + setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1, -1, 500, isNetty(), 3, 1, 2, 3); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); From 682cad63f3e5c8d59be92602de32e523aae032ca Mon Sep 17 00:00:00 2001 From: jbertram Date: Wed, 5 Aug 2015 12:37:53 -0500 Subject: [PATCH 2/4] ARTEMIS-179 Do not reconnect bridge on DISCONNECT Note: this breaks scale-down because the bindings are removed. --- .../activemq/artemis/core/server/cluster/impl/BridgeImpl.java | 3 +-- .../core/server/cluster/impl/ClusterConnectionBridge.java | 4 +++- .../core/server/cluster/impl/ClusterConnectionImpl.java | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 9853455b36..3ef7b24c78 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -587,8 +587,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled else { ActiveMQServerLogger.LOGGER.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID); - //we never fail permanently here, this only happens once all reconnect tries have happened - fail(false); + fail(me.getType() == ActiveMQExceptionType.DISCONNECTED); } tryScheduleRetryReconnect(me.getType()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index c7e70c084d..e336b3300d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -340,7 +340,9 @@ public class ClusterConnectionBridge extends BridgeImpl { @Override protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) { - scheduleRetryConnect(); + if (type != ActiveMQExceptionType.DISCONNECTED) { + scheduleRetryConnect(); + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index c4666b9f58..e42f46b735 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -1428,7 +1428,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn ActiveMQServerLogger.LOGGER.debug("Removing record for: " + targetNodeID); MessageFlowRecord record = records.remove(targetNodeID); try { - record.close(); + if (record != null) { + record.close(); + } } catch (Exception e) { e.printStackTrace(); From 3bb88c60ca683f5ff6788c0c5758a96210e346b5 Mon Sep 17 00:00:00 2001 From: jbertram Date: Thu, 6 Aug 2015 16:28:56 -0500 Subject: [PATCH 3/4] ARTEMIS-179 fix scale-down --- .../artemis/core/client/impl/ClientSessionFactoryImpl.java | 2 +- .../activemq/artemis/core/server/cluster/impl/BridgeImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index aa27581e87..acdf1e8e35 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -579,7 +579,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C sessionsToClose = new HashSet(sessions); } callFailoverListeners(FailoverEventType.FAILOVER_FAILED); - callSessionFailureListeners(me, true, false); + callSessionFailureListeners(me, true, false, scaleDownTargetNodeID); } } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 3ef7b24c78..eb99fcc99d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -565,7 +565,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled catch (Throwable dontCare) { } - if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID)) { + if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) { synchronized (this) { try { ActiveMQServerLogger.LOGGER.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID); From 38188cdf2e6a1a04c6b900ec6bbc6bd49a406b4b Mon Sep 17 00:00:00 2001 From: jbertram Date: Mon, 10 Aug 2015 16:19:01 -0500 Subject: [PATCH 4/4] ARTEMIS-179 fix BindingsClusterTest --- .../jms/cluster/BindingsClusterTest.java | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java index 29c0edc7fa..674c9f15d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/BindingsClusterTest.java @@ -402,28 +402,32 @@ public class BindingsClusterTest extends JMSClusteredTestBase { } private void crash() throws Exception { + /* + * Rather than just calling stop() on the server here we want to simulate an actual node crash or bridge failure + * so the bridge's failure listener needs to get something other than a DISCONNECTED message. In this case we + * simulate a NOT_CONNECTED exception. + */ + final CountDownLatch latch = new CountDownLatch(1); + ClusterConnectionImpl next = (ClusterConnectionImpl) server1.getClusterManager().getClusterConnections().iterator().next(); + BridgeImpl bridge = (BridgeImpl) next.getRecords().values().iterator().next().getBridge(); + RemotingConnection forwardingConnection = getForwardingConnection(bridge); + forwardingConnection.addFailureListener(new FailureListener() { + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + latch.countDown(); + } + + @Override + public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { + connectionFailed(me, failedOver); + } + }); + forwardingConnection.fail(new ActiveMQNotConnectedException()); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + if (crash) { jmsServer2.stop(); } - else { - final CountDownLatch latch = new CountDownLatch(1); - ClusterConnectionImpl next = (ClusterConnectionImpl) server1.getClusterManager().getClusterConnections().iterator().next(); - BridgeImpl bridge = (BridgeImpl) next.getRecords().values().iterator().next().getBridge(); - RemotingConnection forwardingConnection = getForwardingConnection(bridge); - forwardingConnection.addFailureListener(new FailureListener() { - @Override - public void connectionFailed(ActiveMQException exception, boolean failedOver) { - latch.countDown(); - } - - @Override - public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { - connectionFailed(me, failedOver); - } - }); - forwardingConnection.fail(new ActiveMQNotConnectedException()); - assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); - } } private void restart() throws Exception {