From 9dd026118e19e9a07b89661412c4ab48a08f73ad Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 16 Jun 2022 22:53:07 -0400 Subject: [PATCH] ARTEMIS-3862 Short lived subscriptiong makes address size inconsistent --- .../artemis/core/server/impl/QueueImpl.java | 46 +++++ .../client/RemoveSubscriptionRaceTest.java | 172 ++++++++++++++++++ .../tests/integration/paging/PagingTest.java | 2 +- 3 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e17fd6cb92..4068eafbcc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2910,6 +2910,37 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + /** This method is to only be used during deliveryAsync when the queue was destroyed + and the async process left more messages to be delivered + This is a race between destroying the queue and async sends that came after + the deleteQueue already happened. */ + private void removeMessagesWhileDelivering() throws Exception { + assert queueDestroyed : "Method to be used only when the queue was destroyed"; + Transaction tx = new TransactionImpl(storageManager); + int txCount = 0; + + try (LinkedListIterator iter = iterator()) { + while (iter.hasNext()) { + MessageReference ref = iter.next(); + + if (ref.isPaged()) { + // this means the queue is being removed + // hence paged references are just going away through + // page cleanup + continue; + } + acknowledge(tx, ref, AckReason.KILLED, null); + iter.remove(); + refRemoved(ref); + txCount++; + } + + if (txCount > 0) { + tx.commit(); + } + } + } + /** * This method will deliver as many messages as possible until all consumers are busy or there * are no more matching or available messages. @@ -2960,6 +2991,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { synchronized (this) { + if (queueDestroyed) { + if (messageReferences.size() == 0) { + return false; + } + try { + removeMessagesWhileDelivering(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + return false; + } + // Need to do these checks inside the synchronized if (isPaused() || !canDispatch()) { return false; @@ -3109,6 +3152,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } private void checkDepage() { + if (queueDestroyed) { + return; + } if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.tryNext() != PageIterator.NextResult.noElements) { scheduleDepage(false); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java new file mode 100644 index 0000000000..0911d69e5b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class RemoveSubscriptionRaceTest extends ActiveMQTestBase { + + + private static final String SUB_NAME = "SubscriptionStressTest"; + + ActiveMQServer server; + + @Before + public void setServer() throws Exception { + } + + @Test + public void testCreateSubscriptionCoreNoFiles() throws Exception { + internalTest("core", false, 5, 1000); + } + + @Test + public void testCreateSubscriptionAMQPNoFiles() throws Exception { + internalTest("amqp", false, 5, 1000); + } + + @Test + public void testCreateSubscriptionCoreRealFiles() throws Exception { + internalTest("core", true, 2, 200); + } + + @Test + public void testCreateSubscriptionAMQPRealFiles() throws Exception { + internalTest("amqp", true, 2, 200); + } + + public void internalTest(String protocol, boolean realFiles, int threads, int numberOfMessages) throws Exception { + server = createServer(realFiles, true); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST)); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST)); + server.start(); + + CountDownLatch runningLatch = new CountDownLatch(threads); + AtomicBoolean running = new AtomicBoolean(true); + AtomicInteger errors = new AtomicInteger(0); + + ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1, threads)); // I'm using the max here, because I may set threads=0 while hacking the test + + runAfter(() -> executorService.shutdownNow()); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + CyclicBarrier flagStart = new CyclicBarrier(threads + 1); + + for (int i = 0; i < threads; i++) { + executorService.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + for (int n = 0; n < numberOfMessages && running.get(); n++) { + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(SUB_NAME); + MessageConsumer consumer = session.createConsumer(topic); + Message message = consumer.receiveNoWait(); + if (message != null) { + message.acknowledge(); + } + connection.close(); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + + } finally { + runningLatch.countDown(); + } + }); + } + + Connection connection = factory.createConnection(); + connection.start(); + + Queue queue = server.locateQueue("Sub_1"); + Assert.assertNotNull(queue); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(SUB_NAME); + MessageProducer producer = session.createProducer(topic); + MessageConsumer consumer = session.createConsumer(session.createQueue(SUB_NAME + "::" + "Sub_1")); + + flagStart.await(10, TimeUnit.SECONDS); + try { + for (int i = 0; i < numberOfMessages; i++) { + producer.send(session.createTextMessage("a")); + Assert.assertNotNull(consumer.receive(5000)); + } + connection.close(); + } finally { + running.set(false); + Assert.assertTrue(runningLatch.await(10, TimeUnit.SECONDS)); + } + + Wait.assertEquals(0, this::countAddMessage, 5000, 100); + + Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100); + } + + int countAddMessage() throws Exception { + StorageManager manager = server.getStorageManager(); + + if (manager instanceof JournalStorageManager) { + JournalStorageManager journalStorageManager = (JournalStorageManager) manager; + journalStorageManager.getMessageJournal().scheduleCompactAndBlock(5_000); + } else { + return 0; + } + + HashMap journalCounts = countJournal(server.getConfiguration()); + AtomicInteger value = journalCounts.get((int) JournalRecordIds.ADD_MESSAGE_PROTOCOL); + if (value == null) { + return 0; + } + return value.get(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 6e1549294f..1214df64f1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -2738,7 +2738,7 @@ public class PagingTest extends ActiveMQTestBase { for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) { log.debug("Received " + msgCount); msgReceived++; - ClientMessage msg = consumer.receiveImmediate(); + ClientMessage msg = consumer.receive(5000); if (msg == null) { log.debug("It's null. leaving now"); sessionConsumer.commit();