ARTEMIS-4165 Delete messages in case of queue destroy

Messages should be acked even while paging. That will allow page transactions or anything else
to be cleared accordingly.
This commit is contained in:
iliya 2023-02-10 15:37:38 +03:00 committed by clebertsuconic
parent 7fbd1b1a51
commit 6b5f78bfc4
2 changed files with 37 additions and 13 deletions

View File

@ -2294,13 +2294,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.isPaged() && queueDestroyed) {
// this means the queue is being removed
// hence paged references are just going away through
// page cleanup
continue;
}
if (filter1 == null || filter1.match(ref.getMessage())) {
if (messageAction.actMessage(tx, ref)) {
iter.remove();
@ -2337,7 +2330,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
if (pageIterator != null && !queueDestroyed) {
if (pageIterator != null) {
while (pageIterator.hasNext()) {
PagedReference reference = pageIterator.next();
pageIterator.remove();
@ -2362,7 +2355,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (txCount > 0) {
tx.commit();
tx = null;
}
if (filter != null && !queueDestroyed && pageSubscription != null) {

View File

@ -100,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
@ -1738,8 +1739,6 @@ public class PagingTest extends ParameterDBTestBase {
producer.close();
session.start();
long timeout = System.currentTimeMillis() + 30000;
waitBuffer(cons, NUM_MESSAGES / 5);
waitBuffer(cons2, NUM_MESSAGES / 5);
@ -1760,8 +1759,6 @@ public class PagingTest extends ParameterDBTestBase {
final HashMap<Integer, AtomicInteger> recordsType = countJournal(config);
assertNull("The system is acking page records instead of just delete data", recordsType.get((int) JournalRecordIds.ACKNOWLEDGE_CURSOR));
Pair<List<RecordInfo>, List<PreparedTransactionInfo>> journalData = loadMessageJournal(config);
HashSet<Long> deletedQueueReferences = new HashSet<>();
@ -1822,6 +1819,41 @@ public class PagingTest extends ParameterDBTestBase {
server.stop();
}
@Test
public void testDeleteQueue() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
SimpleString queue = new SimpleString("testPurge:" + RandomUtil.randomString());
server.addAddressInfo(new AddressInfo(queue, RoutingType.ANYCAST));
QueueImpl purgeQueue = (QueueImpl) server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setMaxConsumers(1).setPurgeOnNoConsumers(false).setAutoCreateAddress(false));
ConnectionFactory cf = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue jmsQueue = session.createQueue(queue.toString());
purgeQueue.getPageSubscription().getPagingStore().startPaging();
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("hello" + i));
session.commit();
}
Wait.assertEquals(100, purgeQueue::getMessageCount);
purgeQueue.deleteQueue(false);
Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size(), 5_000);
}
private void waitBuffer(ClientConsumerInternal clientBuffer, int bufferSize) {
Wait.assertTrue(() -> "expected " + bufferSize + " but got " + clientBuffer.getBufferSize(), () -> clientBuffer.getBufferSize() > bufferSize, 5000, 100);
}