ARTEMIS-4959 moveMessages operation can move more messages than max messageCount

This commit is contained in:
Howard Gao 2024-07-29 21:09:48 +08:00 committed by clebertsuconic
parent 86f36e95b0
commit 4ce0dfca2d
5 changed files with 116 additions and 54 deletions

View File

@ -1382,6 +1382,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final int messageCount) throws Exception {
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (this.queue.getName().toString().equals(otherQueueName)) {
//doesn't make sense to move messages to itself
throw new IllegalArgumentException("Cannot move messages onto itself");
}
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
}

View File

@ -2280,7 +2280,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueIterateAction messageAction) throws Exception {
int count = 0;
int txCount = 0;
Integer expectedHits = messageAction.expectedHits();
// This is to avoid scheduling depaging while iterQueue is happening
// this should minimize the use of the paged executor.
depagePending = true;
@ -2296,7 +2295,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
while (iter.hasNext() && !messageAction.expectedHitsReached(count)) {
MessageReference ref = iter.next();
if (filter1 == null || filter1.match(ref.getMessage())) {
@ -2306,9 +2305,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
txCount++;
count++;
if (expectedHits != null && count >= expectedHits.intValue()) {
break;
}
}
}
@ -2320,11 +2316,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
txCount = 0;
}
if (messageAction.expectedHitsReached(count)) {
return count;
}
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : filter1.match(ref.getMessage()));
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference);
count++;
txCount++;
if (messageAction.expectedHitsReached(count)) {
break;
}
}
if (txCount > 0) {
@ -2336,7 +2339,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (pageIterator != null) {
while (pageIterator.hasNext()) {
while (pageIterator.hasNext() && !messageAction.expectedHitsReached(count)) {
PagedReference reference = pageIterator.next();
pageIterator.remove();
@ -2750,12 +2753,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final Integer expectedHits = messageCount > 0 ? messageCount : null;
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
return iterQueue(flushLimit, filter, new QueueIterateAction() {
@Override
public Integer expectedHits() {
return expectedHits;
}
return iterQueue(flushLimit, filter, new QueueIterateAction(expectedHits) {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
boolean ignored = false;
@ -2814,11 +2812,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final HashMap<String, Long> queues = new HashMap<>();
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public Integer expectedHits() {
return expectedHits;
}
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@ -4472,8 +4466,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/
abstract class QueueIterateAction {
public Integer expectedHits() {
return null;
protected Integer expectedHits;
QueueIterateAction(Integer expectedHits) {
this.expectedHits = expectedHits;
}
QueueIterateAction() {
this.expectedHits = null;
}
/**
@ -4484,6 +4484,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @throws Exception
*/
public abstract boolean actMessage(Transaction tx, MessageReference ref) throws Exception;
public boolean expectedHitsReached(int currentHits) {
return expectedHits != null && currentHits >= expectedHits.intValue();
}
}
/* For external use we need to use a synchronized version since the list is not thread safe */

View File

@ -1692,6 +1692,37 @@ public abstract class ActiveMQTestBase extends ArtemisTestCase {
}
}
public List<String> sendMessageBatch(int batchSize,
ClientSession session,
SimpleString queueAddr) throws ActiveMQException {
List<String> messageIds = new ArrayList<>();
ClientProducer producer = session.createProducer(queueAddr);
for (int i = 0; i < batchSize; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
String id = UUID.randomUUID().toString();
message.putStringProperty("id", id);
message.putIntProperty("seq", i); // this is to make the print-data easier to debug
messageIds.add(id);
producer.send(message);
}
session.commit();
return messageIds;
}
public boolean waitForMessages(Queue queue, int count, long timeout) throws Exception {
long timeToWait = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < timeToWait) {
if (queue.getMessageCount() >= count) {
return true;
}
Thread.sleep(100);
}
return false;
}
protected final ClientMessage createMessage(ClientSession session,
int counter,
boolean durable) throws ActiveMQException {

View File

@ -75,6 +75,7 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -137,6 +138,63 @@ public class QueueControlTest extends ManagementTestBase {
this.durable = durable;
}
@TestTemplate
public void testMoveMessagesInPagingMode() throws Exception {
final int TOTAL_MESSAGES = 10000;
final String DLA = "DLA";
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false);
SimpleString queueAddr = SimpleString.of("testQueue");
session.createQueue(QueueConfiguration.of(queueAddr).setDurable(durable));
SimpleString dlq = SimpleString.of(DLA);
session.createQueue(QueueConfiguration.of(dlq));
// Set up paging on the queue address
AddressSettings addressSettings = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(16 * 1024).setDeadLetterAddress(dlq);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
sendMessageBatch(TOTAL_MESSAGES, session, queueAddr);
Queue queue = server.locateQueue(queueAddr);
// Give time Queue.deliverAsync to deliver messages
assertTrue(waitForMessages(queue, TOTAL_MESSAGES, 5000));
PagingStore queuePagingStore = queue.getPagingStore();
assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
//invoke moveMessages op
String queueControlResourceName = ResourceNames.QUEUE + "testQueue";
Object resource = server.getManagementService().getResource(queueControlResourceName);
QueueControl queueControl = (QueueControl) resource;
assertEquals(queueControl.getMessageCount(), 10000);
// move messages to DLQ
int count = queueControl.moveMessages(500, "", DLA, false, 500);
assertEquals(500, count);
//messages shouldn't move on to the same queue
try {
queueControl.moveMessages(1000, "", "testQueue", false, 9000);
fail("messages cannot be moved on to the queue itself");
} catch (IllegalArgumentException ok) {
//ok
}
// 9500 left
count = queueControl.moveMessages(1000, "", DLA, false, 9000);
assertEquals(9000, count);
// 500 left, try move 1000
count = queueControl.moveMessages(100, "", DLA, false, 1000);
assertEquals(500, count);
// zero left, try move again
count = queueControl.moveMessages(100, "", DLA, false, 1000);
assertEquals(0, count);
}
@TestTemplate
public void testGetPreparedTransactionMessageCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -20,16 +20,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -292,25 +288,6 @@ public class PagingSendTest extends ActiveMQTestBase {
}
}
public List<String> sendMessageBatch(int batchSize,
ClientSession session,
SimpleString queueAddr) throws ActiveMQException {
List<String> messageIds = new ArrayList<>();
ClientProducer producer = session.createProducer(queueAddr);
for (int i = 0; i < batchSize; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
String id = UUID.randomUUID().toString();
message.putStringProperty("id", id);
message.putIntProperty("seq", i); // this is to make the print-data easier to debug
messageIds.add(id);
producer.send(message);
}
session.commit();
return messageIds;
}
/**
* checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the address has started paging (ignoring any
@ -337,18 +314,6 @@ public class PagingSendTest extends ActiveMQTestBase {
assertEquals(0, duplicates);
}
public boolean waitForMessages(Queue queue, int count, long timeout) throws Exception {
long timeToWait = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < timeToWait) {
if (queue.getMessageCount() >= count) {
return true;
}
Thread.sleep(100);
}
return false;
}
/**
* checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the address has started paging (ignoring any