This commit is contained in:
Clebert Suconic 2020-11-20 14:48:40 -05:00
commit 1529c518b3
2 changed files with 104 additions and 24 deletions

View File

@ -168,28 +168,14 @@ public class ScaleDownHandler {
}
// compile a list of all the relevant queues and queue iterators for this address
RoutingType routingType;
Integer routingTypeOrdinal;
String routingTypeString = "";
for (Queue loopQueue : queues) {
logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);
try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) {
routingType = loopQueue.getRoutingType();
if (null != routingType) {
routingTypeOrdinal = routingType.ordinal();
routingTypeString = routingTypeOrdinal.toString();
}
while (messagesIterator.hasNext()) {
MessageReference messageReference = messagesIterator.next();
Message originalMessage = messageReference.getMessage();
if (null != routingType) {
originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString);
}
Message message = originalMessage.copy();
Message message = messageReference.getMessage().copy();
logger.debug("Reading message " + message + " from queue " + loopQueue);
Set<QueuesXRefInnerManager> queuesFound = new HashSet<>();
@ -199,7 +185,7 @@ public class ScaleDownHandler {
// no need to lookup on itself, we just add it
queuesFound.add(controlEntry.getValue());
} else if (controlEntry.getValue().lookup(messageReference)) {
logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID(message));
logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID());
queuesFound.add(controlEntry.getValue());
}
}
@ -208,7 +194,7 @@ public class ScaleDownHandler {
ByteBuffer buffer = ByteBuffer.allocate(queuesFound.size() * 8);
for (QueuesXRefInnerManager control : queuesFound) {
long queueID = control.getQueueID(message);
long queueID = control.getQueueID();
buffer.putLong(queueID);
}
@ -546,10 +532,10 @@ public class ScaleDownHandler {
return queue;
}
public long getQueueID(Message message) throws Exception {
public long getQueueID() throws Exception {
if (targetQueueID < 0) {
targetQueueID = createQueueWithRoutingTypeIfNecessaryAndGetID(clientSession, queue, queue.getAddress(), message.getRoutingType());
targetQueueID = createQueueWithRoutingTypeIfNecessaryAndGetID(clientSession, queue, queue.getAddress(), queue.getRoutingType());
}
return targetQueueID;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.server;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@ -49,6 +50,11 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -60,6 +66,8 @@ import static org.apache.activemq.artemis.utils.collections.IterableStream.itera
@RunWith(value = Parameterized.class)
public class ScaleDownTest extends ClusterTestBase {
private static final String AMQP_ACCEPTOR_URI = "tcp://127.0.0.1:5672";
private boolean useScaleDownGroupName;
// this will ensure that all tests in this class are run twice,
@ -79,6 +87,7 @@ public class ScaleDownTest extends ClusterTestBase {
super.setUp();
setupLiveServer(0, isFileStorage(), isNetty(), true);
setupLiveServer(1, isFileStorage(), isNetty(), true);
servers[0].getConfiguration().addAcceptorConfiguration("amqp", AMQP_ACCEPTOR_URI + "?protocols=AMQP");
LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
haPolicyConfiguration0.setScaleDownConfiguration(new ScaleDownConfiguration());
LiveOnlyPolicyConfiguration haPolicyConfiguration1 = (LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration();
@ -102,6 +111,11 @@ public class ScaleDownTest extends ClusterTestBase {
return true;
}
@Override
protected boolean isResolveProtocols() {
return true;
}
@Test
public void testBasicScaleDown() throws Exception {
final int TEST_SIZE = 2;
@ -379,11 +393,10 @@ public class ScaleDownTest extends ClusterTestBase {
public void testScaleDownWithMissingAnycastQueue() throws Exception {
final int TEST_SIZE = 2;
final String addressName = "testAddress";
final String queueName1 = "testQueue1";
final String queueName2 = "testQueue2";
final String queueName = "testQueue";
// create 2 queues on each node mapped to the same address
createQueue(0, addressName, queueName2, null, false, null, null, RoutingType.ANYCAST);
createQueue(0, addressName, queueName, null, false, null, null, RoutingType.ANYCAST);
// send messages to node 0
send(0, addressName, TEST_SIZE, false, null);
@ -391,15 +404,96 @@ public class ScaleDownTest extends ClusterTestBase {
// trigger scaleDown from node 0 to node 1
servers[0].stop();
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getRoutingType(), RoutingType.ANYCAST);
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST);
// get the 1 message from queue 2
addConsumer(0, 1, queueName2, null);
addConsumer(0, 1, queueName, null);
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
}
private void sendAMQPMessages(final String address, final int numMessages, final boolean durable) throws Exception {
AmqpClient client = new AmqpClient(new URI(AMQP_ACCEPTOR_URI), "admin", "admin");
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
for (int i = 0; i < numMessages; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
message.setDurable(durable);
sender.send(message);
}
} finally {
connection.close();
}
}
@Test
public void testScaleDownAMQPMessagesWithMissingAnycastQueue() throws Exception {
final int TEST_SIZE = 2;
final String addressName = "testAddress";
final String queueName = "testQueue";
// create 2 queues on each node mapped to the same address
createQueue(0, addressName, queueName, null, false, null, null, RoutingType.ANYCAST);
// send messages to node 0
sendAMQPMessages(addressName, TEST_SIZE, false);
// trigger scaleDown from node 0 to node 1
servers[0].stop();
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST);
// get the 1 message from queue 2
addConsumer(0, 1, queueName, null);
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
}
@Test
public void testScaleDownAMQPMessagesWithMissingMulticastQueues() throws Exception {
final int TEST_SIZE = 2;
ClientMessage clientMessage;
final String addressName = "testAddress";
final String queueName1 = "testQueue1";
final String queueName2 = "testQueue2";
// create 2 queues on each node mapped to the same address
createQueue(0, addressName, queueName1, null, false, null, null, RoutingType.MULTICAST);
createQueue(0, addressName, queueName2, null, false, null, null, RoutingType.MULTICAST);
// send messages to node 0
sendAMQPMessages(addressName, TEST_SIZE, false);
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getBindable()).getMessageCount(), 2);
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getMessageCount(), 2);
// trigger scaleDown from node 0 to node 1
servers[0].stop();
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getBindable()).getRoutingType(), RoutingType.MULTICAST);
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getRoutingType(), RoutingType.MULTICAST);
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getBindable()).getMessageCount(), 2);
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getBindable()).getMessageCount(), 2);
// get the 1 message from queue 1
addConsumer(0, 1, queueName1, null);
clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
// get the 1 message from queue 2
addConsumer(1, 1, queueName2, null);
clientMessage = consumers[1].getConsumer().receive(250);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
}
@Test
public void testMessageProperties() throws Exception {
final int TEST_SIZE = 5;