ARTEMIS-4527 - Redistributor race when consumerCount reaches 0 in cluster

This commit is contained in:
a181321 2024-01-30 13:58:01 +01:00 committed by Clebert Suconic
parent 8ad8c9d385
commit efe450298d
6 changed files with 302 additions and 8 deletions

View File

@ -334,7 +334,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (distance < 1) {
//Binding added locally. If a matching remote binding with consumers exist, add a redistributor
Binding binding = getBinding(routingName);
Binding binding = addressManager.getBinding(routingName);
if (binding != null) {
@ -435,7 +435,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
Binding binding = getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName);
Binding binding = addressManager.getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName);
if (binding != null) {
// We have a local queue
@ -496,7 +496,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return;
}
Binding binding = getBinding(queueName);
Binding binding = addressManager.getBinding(queueName);
if (binding == null) {
logger.debug("PostOffice notification / CONSUMER_CLOSED: Could not find queue {}", queueName);
@ -636,7 +636,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
String delimiter = server.getConfiguration().getWildcardConfiguration().getDelimiterString();
SimpleString internalDivertName = ResourceNames.getRetroactiveResourceDivertName(prefix, delimiter, address);
if (getBinding(internalDivertName) != null) {
if (addressManager.getBinding(internalDivertName) != null) {
server.destroyDivert(internalDivertName, true);
}
@ -1081,7 +1081,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
public Binding getBinding(final SimpleString name) {
public synchronized Binding getBinding(final SimpleString name) {
return addressManager.getBinding(name);
}
@ -1620,6 +1620,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
final Transaction tx = context.getTransaction();
final Long deliveryTime;
boolean containsDurables = false;
if (message.hasScheduledDeliveryTime()) {
deliveryTime = message.getScheduledDeliveryTime();
} else {
@ -1658,6 +1661,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
final List<Queue> durableQueues = entry.getValue().getDurableQueues();
if (!durableQueues.isEmpty()) {
processRouteToDurableQueues(message, context, deliveryTime, tx, durableQueues, refs);
containsDurables = true;
}
}
@ -1669,6 +1673,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (tx != null) {
tx.addOperation(new AddOperation(refs));
} else if (!containsDurables) {
processReferences(refs, direct);
} else {
// This will use the same thread if there are no pending operations
// avoiding a context switch on this case

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -280,7 +281,12 @@ public class ClusterConnectionBridge extends BridgeImpl {
createPermissiveManagementNotificationToFilter() +
")");
sessionConsumer.createQueue(new QueueConfiguration(notifQueueName).setAddress(managementNotificationAddress).setFilterString(filter).setDurable(false).setTemporary(true));
sessionConsumer.createQueue(new QueueConfiguration(notifQueueName)
.setAddress(managementNotificationAddress)
.setFilterString(filter)
.setDurable(false)
.setTemporary(true)
.setRoutingType(RoutingType.MULTICAST));
notifConsumer = sessionConsumer.createConsumer(notifQueueName);

View File

@ -156,7 +156,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
@Override
public synchronized boolean isHighAcceptPriority(final Message message) {
if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
if (consumerCount <= 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false;
}
@ -242,7 +242,10 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding {
}
}
consumerCount--;
if (--consumerCount < 0) {
consumerCount = 0;
}
}
@Override

View File

@ -169,6 +169,23 @@
<artifactId>jakarta.json-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,256 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity;
import javax.jms.Connection;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
/**
* Refer to ./scripts/parameters.sh for suggested parameters
*
* Tests for an issue that's dependent on high overall system load.
* The following parameters are used to tune the resource demands of the test:
* NUMBER_OF_SERVERS, NUMBER_OF_QUEUES, NUMBER_OF_CONSUMERS
*
*/
public class ClusterNotificationsContinuityTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String SERVER_NAME_BASE = "clusterNotifications/cncBroker-";
public static final int SERVER_PORT_BASE = 61616;
private static final String TEST_NAME = "CLUSTER_NOTIFICATIONS_CONTINUITY";
private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
private static final int NUMBER_OF_SERVERS = testProperty(TEST_NAME, "NUMBER_OF_SERVERS", 3);
private static final int NUMBER_OF_QUEUES = testProperty(TEST_NAME, "NUMBER_OF_QUEUES", 200);
private static final int NUMBER_OF_WORKERS = testProperty(TEST_NAME, "NUMBER_OF_WORKERS", 10);
private static final String QUEUE_NAME_PREFIX = "TEST.QUEUE.";
private final Process[] serverProcesses = new Process[NUMBER_OF_SERVERS];
private Process dmlcProcess;
@BeforeClass
public static void createServers() throws Exception {
for (int s = 0; s < NUMBER_OF_SERVERS; s++) {
String serverName = SERVER_NAME_BASE + s;
String staticClusterURI;
{
StringBuffer urlBuffer = new StringBuffer();
boolean first = true;
for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
if (i != s) {
if (!first) {
urlBuffer.append(",");
}
first = false;
urlBuffer.append("tcp://localhost:" + (SERVER_PORT_BASE + i));
}
}
staticClusterURI = urlBuffer.toString();
}
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer
.setRole("amq")
.setUser("admin")
.setPassword("admin")
.setAllowAnonymous(true)
.setNoWeb(true)
.setArtemisInstance(serverLocation)
.setPortOffset(s)
.setClustered(true);
cliCreateServer.setMessageLoadBalancing("OFF_WITH_REDISTRIBUTION");
cliCreateServer.setStaticCluster(staticClusterURI);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1");
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("addressesSettings.#.redistributionDelay", "0");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
}
}
@Before
public void before() throws Exception {
Assume.assumeTrue(TEST_ENABLED);
for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
String serverName = SERVER_NAME_BASE + i;
cleanupData(serverName);
File brokerPropertiesFile = new File(getServerLocation(serverName), "broker.properties");
serverProcesses[i] = startServer(serverName, 0, 0, brokerPropertiesFile);
}
for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
ServerUtil.waitForServerToStart(i, 10_000);
SimpleManagement simpleManagement = new SimpleManagement("tcp://localhost:" + (SERVER_PORT_BASE + i), null, null);
Wait.assertEquals(NUMBER_OF_SERVERS, () -> simpleManagement.listNetworkTopology().size(), 5000);
}
}
@Test
public void testClusterNotificationsContinuity() throws Throwable {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
runAfter(factory::close);
CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES);
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_WORKERS);
runAfter(executorService::shutdownNow);
//run dmlc in spawned process to more easily manage its lifecycle
dmlcProcess = SpawnedVMSupport.spawnVM("org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity.ClusterNotificationsContinuityTest");
runAfter(dmlcProcess::destroyForcibly);
for (int i = 0; i < NUMBER_OF_QUEUES; i++) {
Queue queue = ActiveMQDestination.createQueue(QUEUE_NAME_PREFIX + i);
executorService.execute(() -> {
try (Connection connection = factory.createConnection();
Session session = connection.createSession(Session.SESSION_TRANSACTED)) {
logger.debug("Sending message to queue: {}", queue.getQueueName());
session.createProducer(queue).send(session.createTextMessage("Message"));
session.commit();
latch.countDown();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
latch.await(5000, TimeUnit.MILLISECONDS);
if (!dmlcProcess.waitFor(30_000, TimeUnit.MILLISECONDS)) {
dmlcProcess.destroyForcibly();
}
for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
String serverName = SERVER_NAME_BASE + i;
File artemisLog = new File("target/" + serverName + "/log/artemis.log");
checkLogRecord(artemisLog, false, "AMQ224037");
}
}
@After
public void cleanup() {
SpawnedVMSupport.forceKill();
for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
serverProcesses[i].destroy();
String serverName = SERVER_NAME_BASE + i;
cleanupData(serverName);
}
}
public static void main(String[] args) throws Exception {
ClusterNotificationsContinuityTest cncTest = new ClusterNotificationsContinuityTest();
cncTest.runDMLCClient();
}
private void runDMLCClient() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
runAfter(factory::close);
CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES);
List<DefaultMessageListenerContainer> containers = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_QUEUES; i++) {
try {
String queueName = QUEUE_NAME_PREFIX + i;
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setCacheLevelName("CACHE_NONE");
container.setSessionTransacted(true);
container.setSessionAcknowledgeModeName("SESSION_TRANSACTED");
container.setConcurrentConsumers(NUMBER_OF_WORKERS);
container.setConnectionFactory(new CachingConnectionFactory(factory));
container.setDestinationName(queueName);
container.setReceiveTimeout(100);
container.setMessageListener((MessageListener) msg -> {
logger.debug("Message received on queue: {} ", queueName);
latch.countDown();
});
container.initialize();
container.start();
containers.add(container);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
latch.await(10_000, TimeUnit.MILLISECONDS);
containers.parallelStream().forEach(dmlc -> {
try {
dmlc.stop();
Wait.waitFor(() -> dmlc.getActiveConsumerCount() == 0);
dmlc.shutdown();
((CachingConnectionFactory) dmlc.getConnectionFactory()).destroy();
} catch (Exception ignore) {
}
});
}
}

View File

@ -136,3 +136,9 @@ export TEST_CLIENT_FAILURE_OPENWIRE_TOTAL_ITERATION=2
export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_VMS=5
export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_MESSAGES=20000
export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m
#clusterNotificationsContinuityTest
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10