This commit is contained in:
Clebert Suconic 2018-11-14 10:23:57 -05:00
commit c50c493427
3 changed files with 86 additions and 1 deletions

View File

@ -120,7 +120,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
private void handleCheckForFailover(CheckFailoverMessage failoverMessage) {
String nodeID = failoverMessage.getNodeID();
boolean okToFailover = nodeID == null || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
boolean okToFailover = nodeID == null || server.getNodeID().toString().equals(nodeID) || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
channel1.send(new CheckFailoverReplyMessage(okToFailover));
}

View File

@ -311,6 +311,10 @@ public class ClusterController implements ActiveMQComponent {
this.replicatedClusterName = new SimpleString(replicatedClusterName);
}
public Map<SimpleString, ServerLocatorInternal> getLocators() {
return this.locators;
}
/**
* a handler for handling packets sent between the cluster.
*/

View File

@ -16,9 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.server;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -29,13 +35,18 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
@ -146,6 +157,76 @@ public class ScaleDownTest extends ClusterTestBase {
removeConsumer(0);
}
@Test
public void testScaleDownNodeReconnect() throws Exception {
try {
ClusterController controller = servers[0].getClusterManager().getClusterController();
Map<SimpleString, ServerLocatorInternal> locatorsMap = controller.getLocators();
Iterator<Map.Entry<SimpleString, ServerLocatorInternal>> iter = locatorsMap.entrySet().iterator();
assertTrue(iter.hasNext());
Map.Entry<SimpleString, ServerLocatorInternal> entry = iter.next();
ServerLocatorImpl locator = (ServerLocatorImpl) entry.getValue();
waitForClusterConnected(locator);
servers[1].stop();
servers[1].start();
//by this moment server0 is trying to reconnect to server1
//In normal case server1 will check if the reconnection's scaleDown
//server has been scaled down before granting the connection.
//but if the scaleDown is server1 itself, it should grant
//the connection without checking scaledown state against it.
//Otherwise the connection will never be estabilished, and more,
//the repetitive reconnect attempts will cause
//ClientSessionFactory's closeExecutor to be filled with
//tasks that keep growing until OOM.
checkClusterConnectionExecutorNotBlocking(locator);
} finally {
servers[1].stop();
servers[0].stop();
}
}
private void checkClusterConnectionExecutorNotBlocking(ServerLocatorImpl locator) throws NoSuchFieldException, IllegalAccessException {
Field factoriesField = locator.getClass().getDeclaredField("factories");
factoriesField.setAccessible(true);
Set factories = (Set) factoriesField.get(locator);
assertEquals(1, factories.size());
ClientSessionFactoryImpl factory = (ClientSessionFactoryImpl) factories.iterator().next();
Field executorField = factory.getClass().getDeclaredField("closeExecutor");
executorField.setAccessible(true);
Executor pool = (Executor) executorField.get(factory);
final CountDownLatch latch = new CountDownLatch(1);
pool.execute(()->
latch.countDown()
);
boolean result = false;
try {
result = latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
assertTrue("executor got blocked.", result);
}
private void waitForClusterConnected(ServerLocatorImpl locator) throws Exception {
boolean result = Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return !locator.getTopology().isEmpty();
}
}, 5000);
assertTrue("topology should not be empty", result);
}
@Test
public void testStoreAndForward() throws Exception {
final int TEST_SIZE = 50;