ARTEMIS-4684 Internal Queues should not redistribute

This is particularly true for the Mirrored SNF queue. Redistribution is not meant for internal queues. If an internal queue happens to have the same name on another server, it should not trigger redistribution when consumers are removed.

It would be possible to work around this by adding an address-setting specific to the address with redistribution disabled.

ClusteredMirrorSoakTest was intermittently failing because of this. For a few seconds while the mirror connection is still being made connections could move messages from one node towards another node if both have the same name.
This commit is contained in:
Clebert Suconic 2024-03-12 19:05:00 -04:00 committed by clebertsuconic
parent c5b81d929d
commit 11b7671960
4 changed files with 39 additions and 6 deletions

View File

@ -170,6 +170,10 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
assert snfQueue != null;
this.replicaConfig = replicaConfig;
this.snfQueue = snfQueue;
if (!snfQueue.isInternalQueue()) {
logger.debug("marking queue {} as internal to avoid redistribution kicking in", snfQueue.getName());
snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
}
this.server = server;
this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.addQueues = replicaConfig.isQueueCreation();

View File

@ -1594,8 +1594,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return supports;
}
public synchronized Redistributor getRedistributor() {
return redistributor == null ? null : redistributor.consumer;
}
@Override
public synchronized void addRedistributor(final long delay) {
if (isInternalQueue()) {
logger.debug("Queue {} is internal, can't be redistributed!", this.name);
return;
}
clearRedistributorFuture();
if (redistributor != null) {

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@ -68,10 +69,10 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
largeBody = writer.toString();
}
public static final String DC1_NODE_A = "mirror/DC1/A";
public static final String DC2_NODE_A = "mirror/DC2/A";
public static final String DC1_NODE_B = "mirror/DC1/B";
public static final String DC2_NODE_B = "mirror/DC2/B";
public static final String DC1_NODE_A = "ClusteredMirrorSoakTest/DC1/A";
public static final String DC2_NODE_A = "ClusteredMirrorSoakTest/DC2/A";
public static final String DC1_NODE_B = "ClusteredMirrorSoakTest/DC1/B";
public static final String DC2_NODE_B = "ClusteredMirrorSoakTest/DC2/B";
Process processDC1_node_A;
Process processDC1_node_B;
@ -115,6 +116,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by ClusteredMirrorSoakTest.java --> \n"));
}
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC2_NODE_A);
cleanupData(DC2_NODE_B);
cleanupData(DC2_NODE_B);
}
@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
@ -286,7 +297,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
startServers();
String queueName = "queue" + RandomUtil.randomString();
String queueName = "testqueue" + RandomUtil.randomString();
final int numberOfMessages = 50;
@ -307,7 +318,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
Wait.assertEquals(numberOfMessages, receiverCount::get, 5000);
Wait.assertEquals(numberOfMessages, receiverCount::get, 30_000);
Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));

View File

@ -1084,6 +1084,15 @@ public class QueueImplTest extends ActiveMQTestBase {
}
}
@Test
public void testNoRedistributorInternalQueue() throws Exception {
QueueImpl queue = getTemporaryQueue();
queue.setInternalQueue(true);
queue.addRedistributor(0);
Assert.assertNull(queue.getRedistributor());
}
private void testConsumerWithFilters(final boolean direct) throws Exception {
QueueImpl queue = getTemporaryQueue();