ARTEMIS-2174 Broker reconnect cause OOM with scale down

When a node tries to reconnects to another node in a scale down cluster,
the reconnect request gets denied by the other node and keeps retrying,
which causes tasks in the ordered executor accumulate and eventually OOM.

The fix is to change the ActiveMQPacketHandler#handleCheckForFailover
to allow reconnect if the scale down node is the node itself.
This commit is contained in:
Howard Gao 2018-11-14 19:21:48 +08:00 committed by Clebert Suconic
parent 256e7c8aae
commit 6e89b22eaa
3 changed files with 86 additions and 1 deletions

View File

@ -120,7 +120,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
private void handleCheckForFailover(CheckFailoverMessage failoverMessage) {
String nodeID = failoverMessage.getNodeID();
boolean okToFailover = nodeID == null || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
boolean okToFailover = nodeID == null || server.getNodeID().toString().equals(nodeID) || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
channel1.send(new CheckFailoverReplyMessage(okToFailover));
}

View File

@ -311,6 +311,10 @@ public class ClusterController implements ActiveMQComponent {
this.replicatedClusterName = new SimpleString(replicatedClusterName);
}
public Map<SimpleString, ServerLocatorInternal> getLocators() {
return this.locators;
}
/**
* a handler for handling packets sent between the cluster.
*/

View File

@ -16,9 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.server;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -29,13 +35,18 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
@ -146,6 +157,76 @@ public class ScaleDownTest extends ClusterTestBase {
removeConsumer(0);
}
@Test
public void testScaleDownNodeReconnect() throws Exception {
try {
ClusterController controller = servers[0].getClusterManager().getClusterController();
Map<SimpleString, ServerLocatorInternal> locatorsMap = controller.getLocators();
Iterator<Map.Entry<SimpleString, ServerLocatorInternal>> iter = locatorsMap.entrySet().iterator();
assertTrue(iter.hasNext());
Map.Entry<SimpleString, ServerLocatorInternal> entry = iter.next();
ServerLocatorImpl locator = (ServerLocatorImpl) entry.getValue();
waitForClusterConnected(locator);
servers[1].stop();
servers[1].start();
//by this moment server0 is trying to reconnect to server1
//In normal case server1 will check if the reconnection's scaleDown
//server has been scaled down before granting the connection.
//but if the scaleDown is server1 itself, it should grant
//the connection without checking scaledown state against it.
//Otherwise the connection will never be estabilished, and more,
//the repetitive reconnect attempts will cause
//ClientSessionFactory's closeExecutor to be filled with
//tasks that keep growing until OOM.
checkClusterConnectionExecutorNotBlocking(locator);
} finally {
servers[1].stop();
servers[0].stop();
}
}
private void checkClusterConnectionExecutorNotBlocking(ServerLocatorImpl locator) throws NoSuchFieldException, IllegalAccessException {
Field factoriesField = locator.getClass().getDeclaredField("factories");
factoriesField.setAccessible(true);
Set factories = (Set) factoriesField.get(locator);
assertEquals(1, factories.size());
ClientSessionFactoryImpl factory = (ClientSessionFactoryImpl) factories.iterator().next();
Field executorField = factory.getClass().getDeclaredField("closeExecutor");
executorField.setAccessible(true);
Executor pool = (Executor) executorField.get(factory);
final CountDownLatch latch = new CountDownLatch(1);
pool.execute(()->
latch.countDown()
);
boolean result = false;
try {
result = latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
assertTrue("executor got blocked.", result);
}
private void waitForClusterConnected(ServerLocatorImpl locator) throws Exception {
boolean result = Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return !locator.getTopology().isEmpty();
}
}, 5000);
assertTrue("topology should not be empty", result);
}
@Test
public void testStoreAndForward() throws Exception {
final int TEST_SIZE = 50;