diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9cf8b61908..51ad8e7651 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -334,7 +334,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (distance < 1) { //Binding added locally. If a matching remote binding with consumers exist, add a redistributor - Binding binding = getBinding(routingName); + Binding binding = addressManager.getBinding(routingName); if (binding != null) { @@ -435,7 +435,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS); - Binding binding = getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName); + Binding binding = addressManager.getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName); if (binding != null) { // We have a local queue @@ -496,7 +496,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return; } - Binding binding = getBinding(queueName); + Binding binding = addressManager.getBinding(queueName); if (binding == null) { logger.debug("PostOffice notification / CONSUMER_CLOSED: Could not find queue {}", queueName); @@ -636,7 +636,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding String delimiter = server.getConfiguration().getWildcardConfiguration().getDelimiterString(); SimpleString internalDivertName = ResourceNames.getRetroactiveResourceDivertName(prefix, delimiter, address); - if (getBinding(internalDivertName) != null) { + if (addressManager.getBinding(internalDivertName) != null) { server.destroyDivert(internalDivertName, true); } @@ -1081,7 +1081,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public Binding getBinding(final SimpleString name) { + public synchronized Binding getBinding(final SimpleString name) { return addressManager.getBinding(name); } @@ -1620,6 +1620,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding final Transaction tx = context.getTransaction(); final Long deliveryTime; + + boolean containsDurables = false; + if (message.hasScheduledDeliveryTime()) { deliveryTime = message.getScheduledDeliveryTime(); } else { @@ -1658,6 +1661,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding final List durableQueues = entry.getValue().getDurableQueues(); if (!durableQueues.isEmpty()) { processRouteToDurableQueues(message, context, deliveryTime, tx, durableQueues, refs); + containsDurables = true; } } @@ -1669,6 +1673,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (tx != null) { tx.addOperation(new AddOperation(refs)); + } else if (!containsDurables) { + processReferences(refs, direct); } else { // This will use the same thread if there are no pending operations // avoiding a context switch on this case diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index 2bd045500d..22a8bdb476 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -280,7 +281,12 @@ public class ClusterConnectionBridge extends BridgeImpl { createPermissiveManagementNotificationToFilter() + ")"); - sessionConsumer.createQueue(new QueueConfiguration(notifQueueName).setAddress(managementNotificationAddress).setFilterString(filter).setDurable(false).setTemporary(true)); + sessionConsumer.createQueue(new QueueConfiguration(notifQueueName) + .setAddress(managementNotificationAddress) + .setFilterString(filter) + .setDurable(false) + .setTemporary(true) + .setRoutingType(RoutingType.MULTICAST)); notifConsumer = sessionConsumer.createConsumer(notifQueueName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java index 992b909ff8..5680d7aa54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java @@ -156,7 +156,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { @Override public synchronized boolean isHighAcceptPriority(final Message message) { - if (consumerCount == 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { + if (consumerCount <= 0 || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { return false; } @@ -242,7 +242,10 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { } } - consumerCount--; + if (--consumerCount < 0) { + consumerCount = 0; + } + } @Override diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml index 70aebe5297..ed669afd00 100644 --- a/tests/soak-tests/pom.xml +++ b/tests/soak-tests/pom.xml @@ -169,6 +169,23 @@ jakarta.json-api test + + + org.springframework + spring-core + test + + + org.springframework + spring-context + test + + + org.springframework + spring-jms + test + + diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java new file mode 100644 index 0000000000..09bffd1aef --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity; + +import javax.jms.Connection; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.listener.DefaultMessageListenerContainer; + +import static org.apache.activemq.artemis.utils.TestParameters.testProperty; + +/** + * Refer to ./scripts/parameters.sh for suggested parameters + * + * Tests for an issue that's dependent on high overall system load. + * The following parameters are used to tune the resource demands of the test: + * NUMBER_OF_SERVERS, NUMBER_OF_QUEUES, NUMBER_OF_CONSUMERS + * + */ + +public class ClusterNotificationsContinuityTest extends SoakTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String SERVER_NAME_BASE = "clusterNotifications/cncBroker-"; + public static final int SERVER_PORT_BASE = 61616; + private static final String TEST_NAME = "CLUSTER_NOTIFICATIONS_CONTINUITY"; + private static final boolean TEST_ENABLED = Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true")); + + private static final int NUMBER_OF_SERVERS = testProperty(TEST_NAME, "NUMBER_OF_SERVERS", 3); + private static final int NUMBER_OF_QUEUES = testProperty(TEST_NAME, "NUMBER_OF_QUEUES", 200); + private static final int NUMBER_OF_WORKERS = testProperty(TEST_NAME, "NUMBER_OF_WORKERS", 10); + private static final String QUEUE_NAME_PREFIX = "TEST.QUEUE."; + private final Process[] serverProcesses = new Process[NUMBER_OF_SERVERS]; + private Process dmlcProcess; + + @BeforeClass + public static void createServers() throws Exception { + for (int s = 0; s < NUMBER_OF_SERVERS; s++) { + String serverName = SERVER_NAME_BASE + s; + + String staticClusterURI; + + { + StringBuffer urlBuffer = new StringBuffer(); + boolean first = true; + for (int i = 0; i < NUMBER_OF_SERVERS; i++) { + if (i != s) { + if (!first) { + urlBuffer.append(","); + } + first = false; + urlBuffer.append("tcp://localhost:" + (SERVER_PORT_BASE + i)); + } + } + + staticClusterURI = urlBuffer.toString(); + } + + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer + .setRole("amq") + .setUser("admin") + .setPassword("admin") + .setAllowAnonymous(true) + .setNoWeb(true) + .setArtemisInstance(serverLocation) + .setPortOffset(s) + .setClustered(true); + + cliCreateServer.setMessageLoadBalancing("OFF_WITH_REDISTRIBUTION"); + cliCreateServer.setStaticCluster(staticClusterURI); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1"); + + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("addressesSettings.#.redistributionDelay", "0"); + + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + } + } + + @Before + public void before() throws Exception { + Assume.assumeTrue(TEST_ENABLED); + for (int i = 0; i < NUMBER_OF_SERVERS; i++) { + String serverName = SERVER_NAME_BASE + i; + + cleanupData(serverName); + File brokerPropertiesFile = new File(getServerLocation(serverName), "broker.properties"); + serverProcesses[i] = startServer(serverName, 0, 0, brokerPropertiesFile); + } + + for (int i = 0; i < NUMBER_OF_SERVERS; i++) { + ServerUtil.waitForServerToStart(i, 10_000); + SimpleManagement simpleManagement = new SimpleManagement("tcp://localhost:" + (SERVER_PORT_BASE + i), null, null); + Wait.assertEquals(NUMBER_OF_SERVERS, () -> simpleManagement.listNetworkTopology().size(), 5000); + } + } + + @Test + public void testClusterNotificationsContinuity() throws Throwable { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + runAfter(factory::close); + + CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES); + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_WORKERS); + runAfter(executorService::shutdownNow); + + //run dmlc in spawned process to more easily manage its lifecycle + dmlcProcess = SpawnedVMSupport.spawnVM("org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity.ClusterNotificationsContinuityTest"); + runAfter(dmlcProcess::destroyForcibly); + + for (int i = 0; i < NUMBER_OF_QUEUES; i++) { + Queue queue = ActiveMQDestination.createQueue(QUEUE_NAME_PREFIX + i); + + executorService.execute(() -> { + try (Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.SESSION_TRANSACTED)) { + + logger.debug("Sending message to queue: {}", queue.getQueueName()); + session.createProducer(queue).send(session.createTextMessage("Message")); + session.commit(); + + latch.countDown(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + }); + } + + latch.await(5000, TimeUnit.MILLISECONDS); + + if (!dmlcProcess.waitFor(30_000, TimeUnit.MILLISECONDS)) { + dmlcProcess.destroyForcibly(); + } + + for (int i = 0; i < NUMBER_OF_SERVERS; i++) { + String serverName = SERVER_NAME_BASE + i; + + File artemisLog = new File("target/" + serverName + "/log/artemis.log"); + checkLogRecord(artemisLog, false, "AMQ224037"); + } + + } + + @After + public void cleanup() { + SpawnedVMSupport.forceKill(); + + for (int i = 0; i < NUMBER_OF_SERVERS; i++) { + serverProcesses[i].destroy(); + String serverName = SERVER_NAME_BASE + i; + cleanupData(serverName); + } + } + + public static void main(String[] args) throws Exception { + ClusterNotificationsContinuityTest cncTest = new ClusterNotificationsContinuityTest(); + cncTest.runDMLCClient(); + } + + private void runDMLCClient() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + runAfter(factory::close); + + CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES); + List containers = new ArrayList<>(); + + for (int i = 0; i < NUMBER_OF_QUEUES; i++) { + try { + String queueName = QUEUE_NAME_PREFIX + i; + + DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); + container.setCacheLevelName("CACHE_NONE"); + container.setSessionTransacted(true); + container.setSessionAcknowledgeModeName("SESSION_TRANSACTED"); + container.setConcurrentConsumers(NUMBER_OF_WORKERS); + container.setConnectionFactory(new CachingConnectionFactory(factory)); + container.setDestinationName(queueName); + container.setReceiveTimeout(100); + container.setMessageListener((MessageListener) msg -> { + logger.debug("Message received on queue: {} ", queueName); + latch.countDown(); + }); + + container.initialize(); + container.start(); + containers.add(container); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + latch.await(10_000, TimeUnit.MILLISECONDS); + + containers.parallelStream().forEach(dmlc -> { + try { + dmlc.stop(); + Wait.waitFor(() -> dmlc.getActiveConsumerCount() == 0); + dmlc.shutdown(); + ((CachingConnectionFactory) dmlc.getConnectionFactory()).destroy(); + } catch (Exception ignore) { + } + }); + + } + +} diff --git a/tests/soak-tests/src/test/scripts/parameters.sh b/tests/soak-tests/src/test/scripts/parameters.sh index 8f443c681a..62908b885b 100755 --- a/tests/soak-tests/src/test/scripts/parameters.sh +++ b/tests/soak-tests/src/test/scripts/parameters.sh @@ -136,3 +136,9 @@ export TEST_CLIENT_FAILURE_OPENWIRE_TOTAL_ITERATION=2 export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_VMS=5 export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_MESSAGES=20000 export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m + +#clusterNotificationsContinuityTest +export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true +export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3 +export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200 +export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10 \ No newline at end of file