ARTEMIS-3862 Short lived subscriptiong makes address size inconsistent

This commit is contained in:
Clebert Suconic 2022-06-16 22:53:07 -04:00 committed by clebertsuconic
parent 196e604778
commit 9dd026118e
3 changed files with 219 additions and 1 deletions

View File

@ -2910,6 +2910,37 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
} }
/** This method is to only be used during deliveryAsync when the queue was destroyed
and the async process left more messages to be delivered
This is a race between destroying the queue and async sends that came after
the deleteQueue already happened. */
private void removeMessagesWhileDelivering() throws Exception {
assert queueDestroyed : "Method to be used only when the queue was destroyed";
Transaction tx = new TransactionImpl(storageManager);
int txCount = 0;
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.isPaged()) {
// this means the queue is being removed
// hence paged references are just going away through
// page cleanup
continue;
}
acknowledge(tx, ref, AckReason.KILLED, null);
iter.remove();
refRemoved(ref);
txCount++;
}
if (txCount > 0) {
tx.commit();
}
}
}
/** /**
* This method will deliver as many messages as possible until all consumers are busy or there * This method will deliver as many messages as possible until all consumers are busy or there
* are no more matching or available messages. * are no more matching or available messages.
@ -2960,6 +2991,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
synchronized (this) { synchronized (this) {
if (queueDestroyed) {
if (messageReferences.size() == 0) {
return false;
}
try {
removeMessagesWhileDelivering();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return false;
}
// Need to do these checks inside the synchronized // Need to do these checks inside the synchronized
if (isPaused() || !canDispatch()) { if (isPaused() || !canDispatch()) {
return false; return false;
@ -3109,6 +3152,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
private void checkDepage() { private void checkDepage() {
if (queueDestroyed) {
return;
}
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.tryNext() != PageIterator.NextResult.noElements) { if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.tryNext() != PageIterator.NextResult.noElements) {
scheduleDepage(false); scheduleDepage(false);
} }

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class RemoveSubscriptionRaceTest extends ActiveMQTestBase {
private static final String SUB_NAME = "SubscriptionStressTest";
ActiveMQServer server;
@Before
public void setServer() throws Exception {
}
@Test
public void testCreateSubscriptionCoreNoFiles() throws Exception {
internalTest("core", false, 5, 1000);
}
@Test
public void testCreateSubscriptionAMQPNoFiles() throws Exception {
internalTest("amqp", false, 5, 1000);
}
@Test
public void testCreateSubscriptionCoreRealFiles() throws Exception {
internalTest("core", true, 2, 200);
}
@Test
public void testCreateSubscriptionAMQPRealFiles() throws Exception {
internalTest("amqp", true, 2, 200);
}
public void internalTest(String protocol, boolean realFiles, int threads, int numberOfMessages) throws Exception {
server = createServer(realFiles, true);
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST));
server.start();
CountDownLatch runningLatch = new CountDownLatch(threads);
AtomicBoolean running = new AtomicBoolean(true);
AtomicInteger errors = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1, threads)); // I'm using the max here, because I may set threads=0 while hacking the test
runAfter(() -> executorService.shutdownNow());
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
CyclicBarrier flagStart = new CyclicBarrier(threads + 1);
for (int i = 0; i < threads; i++) {
executorService.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int n = 0; n < numberOfMessages && running.get(); n++) {
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(SUB_NAME);
MessageConsumer consumer = session.createConsumer(topic);
Message message = consumer.receiveNoWait();
if (message != null) {
message.acknowledge();
}
connection.close();
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
} finally {
runningLatch.countDown();
}
});
}
Connection connection = factory.createConnection();
connection.start();
Queue queue = server.locateQueue("Sub_1");
Assert.assertNotNull(queue);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(SUB_NAME);
MessageProducer producer = session.createProducer(topic);
MessageConsumer consumer = session.createConsumer(session.createQueue(SUB_NAME + "::" + "Sub_1"));
flagStart.await(10, TimeUnit.SECONDS);
try {
for (int i = 0; i < numberOfMessages; i++) {
producer.send(session.createTextMessage("a"));
Assert.assertNotNull(consumer.receive(5000));
}
connection.close();
} finally {
running.set(false);
Assert.assertTrue(runningLatch.await(10, TimeUnit.SECONDS));
}
Wait.assertEquals(0, this::countAddMessage, 5000, 100);
Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100);
}
int countAddMessage() throws Exception {
StorageManager manager = server.getStorageManager();
if (manager instanceof JournalStorageManager) {
JournalStorageManager journalStorageManager = (JournalStorageManager) manager;
journalStorageManager.getMessageJournal().scheduleCompactAndBlock(5_000);
} else {
return 0;
}
HashMap<Integer, AtomicInteger> journalCounts = countJournal(server.getConfiguration());
AtomicInteger value = journalCounts.get((int) JournalRecordIds.ADD_MESSAGE_PROTOCOL);
if (value == null) {
return 0;
}
return value.get();
}
}

View File

@ -2738,7 +2738,7 @@ public class PagingTest extends ActiveMQTestBase {
for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) { for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) {
log.debug("Received " + msgCount); log.debug("Received " + msgCount);
msgReceived++; msgReceived++;
ClientMessage msg = consumer.receiveImmediate(); ClientMessage msg = consumer.receive(5000);
if (msg == null) { if (msg == null) {
log.debug("It's null. leaving now"); log.debug("It's null. leaving now");
sessionConsumer.commit(); sessionConsumer.commit();