ARTEMIS-2462 Applying fix on delete SNF queue after ScaleDown

This commit is contained in:
Clebert Suconic 2019-09-16 22:27:06 -04:00
parent dd20f89bd0
commit b846f356bb
8 changed files with 76 additions and 110 deletions

View File

@ -202,6 +202,9 @@ public interface Queue extends Bindable,CriticalComponent {
void deleteQueue(boolean removeConsumers) throws Exception;
/** This method will push a removeAddress call into server's remove address */
void removeAddress() throws Exception;
void destroyPaging() throws Exception;
long getMessageCount();

View File

@ -723,30 +723,35 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) {
synchronized (this) {
try {
logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
// stop the bridge from trying to reconnect and clean up all the bindings
fail(true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
scaleDown(scaleDownTargetNodeID);
} else if (scaleDownTargetNodeID != null) {
// the disconnected node is scaling down to me, no need to reconnect to it
logger.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect.");
fail(true);
fail(true, true);
} else {
logger.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID);
fail(me.getType() == ActiveMQExceptionType.DISCONNECTED);
fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false);
}
tryScheduleRetryReconnect(me.getType());
}
protected void scaleDown(String scaleDownTargetNodeID) {
synchronized (this) {
try {
logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
// stop the bridge from trying to reconnect and clean up all the bindings
fail(true, true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
}
protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) {
scheduleRetryConnect();
}
@ -865,7 +870,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return transformer;
}
protected void fail(final boolean permanently) {
protected void fail(final boolean permanently, boolean scaleDown) {
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
if (targetNodeID != null) {
@ -1050,7 +1055,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} catch (Throwable ignored) {
}
}
fail(false);
fail(false, false);
scheduleRetryConnect();
}
}
@ -1069,7 +1074,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttemptsInUse) {
ActiveMQServerLogger.LOGGER.bridgeAbortStart(name, retryCount, reconnectAttempts);
fail(true);
fail(true, false);
return;
}

View File

@ -397,13 +397,23 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
@Override
protected void fail(final boolean permanently) {
protected void fail(final boolean permanently, final boolean scaleDown) {
logger.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
super.fail(permanently);
super.fail(permanently, scaleDown);
if (permanently) {
logger.debug("cluster node for bridge " + this.getName() + " is permanently down");
clusterConnection.removeRecord(targetNodeID);
if (scaleDown) {
try {
queue.deleteQueue(true);
queue.removeAddress();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
} else {
clusterConnection.disconnectRecord(targetNodeID);
}

View File

@ -2075,6 +2075,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deleteQueue(false);
}
@Override
public void removeAddress() throws Exception {
server.removeAddressInfo(getAddress(), null);
}
@Override
public void deleteQueue(boolean removeConsumers) throws Exception {
synchronized (this) {

View File

@ -794,6 +794,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void removeAddress() throws Exception {
}
@Override
public long getAcknowledgeAttempts() {
return 0;

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -76,6 +77,10 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId());
IntegrationTestLogger.LOGGER.info("Node 2: " + servers[2].getClusterManager().getNodeId());
IntegrationTestLogger.LOGGER.info("===============================");
servers[0].setIdentity("Node0");
servers[1].setIdentity("Node1");
servers[2].setIdentity("Node2");
}
protected boolean isNetty() {
@ -117,7 +122,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
// pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge
String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
final String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
snfQueue.pause();
@ -156,20 +161,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
// add a consumer to node 0 to trigger redistribution here
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to the SnF queue
long timeout = 10000;
long start = System.currentTimeMillis();
long messageCount = 0;
while (System.currentTimeMillis() - start < timeout) {
// ensure the message is not in the queue on node 2
messageCount = getMessageCount(snfQueue);
if (messageCount < TEST_SIZE) {
Thread.sleep(200);
} else {
break;
}
}
Wait.assertEquals(TEST_SIZE, snfQueue::getMessageCount);
// ensure the message is in the SnF queue
Assert.assertEquals(TEST_SIZE, getMessageCount(snfQueue));
@ -179,37 +171,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
removeConsumer(0);
servers[0].stop();
start = System.currentTimeMillis();
Queue queueServer2 = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
while (System.currentTimeMillis() - start < timeout) {
// ensure the message is not in the queue on node 2
messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
if (messageCount > 0) {
Thread.sleep(200);
} else {
break;
}
}
Assert.assertEquals(0, messageCount);
Wait.assertEquals(0, queueServer2::getMessageCount);
// get the messages from queue 1 on node 1
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to node 1
start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
// ensure the message is not in the queue on node 2
messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
if (messageCount < TEST_SIZE) {
Thread.sleep(200);
} else {
break;
}
}
// ensure the message is in queue 1 on node 1 as expected
Assert.assertEquals(TEST_SIZE, messageCount);
Queue queueServer1 = ((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
Wait.assertEquals(TEST_SIZE, queueServer1::getMessageCount);
for (int i = 0; i < TEST_SIZE; i++) {
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
@ -229,6 +200,12 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNull(clientMessage);
removeConsumer(0);
Wait.assertTrue(() -> (servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
Wait.assertTrue(() -> (servers[1].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
Assert.assertFalse(servers[1].queueQuery(SimpleString.toSimpleString(snfAddress)).isExists());
Assert.assertFalse(servers[1].addressQuery(SimpleString.toSimpleString(snfAddress)).isExists());
}
@Test
@ -278,23 +255,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to the SnF queue
long timeout = 10000;
long start = System.currentTimeMillis();
long messageCount = 0;
while (System.currentTimeMillis() - start < timeout) {
// ensure the message is not in the queue on node 2
messageCount = getMessageCount(snfQueue);
if (messageCount < TEST_SIZE * 2) {
Thread.sleep(200);
} else {
break;
}
}
// ensure the message is in the SnF queue
Assert.assertEquals(TEST_SIZE * 2, getMessageCount(snfQueue));
Wait.assertEquals(TEST_SIZE * 2, snfQueue::getMessageCount);
// trigger scaleDown from node 0 to node 1
IntegrationTestLogger.LOGGER.info("============ Stopping " + servers[0].getNodeID());
@ -302,20 +264,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
removeConsumer(1);
servers[0].stop();
start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
// ensure the messages are not in the queues on node 2
messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
messageCount += getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
if (messageCount > 0) {
Thread.sleep(200);
} else {
break;
}
}
Assert.assertEquals(0, messageCount);
Wait.assertEquals(0, () -> getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
@ -323,21 +273,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
// allow some time for redistribution to move the message to node 1
start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
// ensure the message is not in the queue on node 2
messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
messageCount += getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
if (messageCount < TEST_SIZE * 2) {
Thread.sleep(200);
} else {
break;
}
}
Wait.assertEquals(TEST_SIZE * 2, () -> getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
// ensure the message is in queue 1 on node 1 as expected
Assert.assertEquals(TEST_SIZE * 2, messageCount);
for (int i = 0; i < TEST_SIZE; i++) {
ClientMessage clientMessage = consumers[0].getConsumer().receive(1000);

View File

@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -100,8 +98,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
System.out.println("[sf queue on server 1]: " + sfQueueName);
QueueQueryResult result = servers[1].queueQuery(sfQueueName);
assertTrue(result.isExists());
Assert.assertTrue(servers[1].queueQuery(sfQueueName).isExists());
// trigger scaleDown from node 0 to node 1
servers[0].stop();
@ -117,10 +115,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
removeConsumer(0);
//check
result = servers[1].queueQuery(sfQueueName);
AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
assertFalse(result.isExists());
assertFalse(result2.isExists());
Assert.assertFalse(servers[1].queueQuery(sfQueueName).isExists());
Assert.assertFalse(servers[1].addressQuery(sfQueueName).isExists());
}

View File

@ -59,6 +59,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void removeAddress() throws Exception {
}
@Override
public long getDelayBeforeDispatch() {
return 0;