ARTEMIS-3834 include paged messages sending to DLA

This commit is contained in:
AntonRoskvist 2022-05-24 20:07:01 +02:00 committed by Justin Bertram
parent 8386d6f517
commit 6dd7965906
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
3 changed files with 107 additions and 14 deletions

View File

@ -2576,27 +2576,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
}
if (pageIterator != null && !queueDestroyed) {
while (pageIterator.hasNext()) {
PagedReference ref = pageIterator.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering(ref);
sendToDeadLetterAddress(null, ref);
pageIterator.remove();
refRemoved(ref);
return true;
}
}
}
return false;
}
}
@Override
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
int count = 0;
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering(ref);
sendToDeadLetterAddress(null, ref);
iter.remove();
refRemoved(ref);
count++;
}
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering(ref);
return sendToDeadLetterAddress(tx, ref);
}
return count;
}
});
}
@Override

View File

@ -1653,6 +1653,92 @@ public class QueueControlTest extends ManagementTestBase {
assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
}
/**
* Test send to DLA while paging includes paged messages
*/
@Test
public void testSendToDLAIncludesPagedMessages() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final SimpleString dlq = new SimpleString("DLQ1");
final String sampleText = "Put me on DLQ";
final int messageCount = 10;
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setMaxSizeBytes(200L);
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
server.getAddressSettingsRepository().addMatch(dla.toString(), addressSettings);
session.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(durable));
session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
// Send message to queue, make sure address enters paging.
ClientProducer producer = session.createProducer(adName);
for (int i = 0; i < messageCount; i++) {
producer.send(createTextMessage(session, sampleText));
}
Wait.assertTrue(server.locateQueue(qName).getPagingStore()::isPaging);
//Send all messages to DLA, make sure all are sent
QueueControl queueControl = createManagementControl(adName, qName);
Assert.assertEquals(messageCount, queueControl.sendMessagesToDeadLetterAddress(null));
Assert.assertEquals(0, getMessageCount(queueControl));
//Make sure all shows up on DLA
queueControl = createManagementControl(dla, dlq);
Assert.assertEquals(messageCount, getMessageCount(queueControl));
queueControl.removeAllMessages();
}
/**
* Test send single message to DLA while paging includes paged message
*/
@Test
public void testSendMessageToDLAIncludesPagedMessage() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final SimpleString dlq = new SimpleString("DLQ1");
final String sampleText = "Message Content";
final int messageCount = 10;
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setMaxSizeBytes(200L);
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
server.getAddressSettingsRepository().addMatch(dla.toString(), addressSettings);
session.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(durable));
session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
// Send message to queue, make sure address enters paging.
ClientProducer producer = session.createProducer(adName);
for (int i = 0; i < messageCount; i++) {
producer.send(createTextMessage(session, sampleText));
}
Wait.assertTrue(server.locateQueue(qName).getPagingStore()::isPaging);
//Send identifiable message to DLA
producer.send(createTextMessage(session, sampleText).putStringProperty("myID", "unique"));
QueueControl queueControl = createManagementControl(adName, qName);
Map<String, Object>[] messages = queueControl.listMessages(null);
long messageID = (Long) messages[messageCount].get("messageID");
Assert.assertTrue(queueControl.sendMessageToDeadLetterAddress(messageID));
queueControl.removeAllMessages();
//Make sure it shows up on DLA
queueControl = createManagementControl(dla, dlq);
messages = queueControl.listMessages(null);
Assert.assertEquals(1, messages.length);
Assert.assertEquals("unique", (String) messages[0].get("myID"));
queueControl.removeAllMessages();
}
/**
* <ol>
* <li>send a message to queue</li>

View File

@ -545,7 +545,7 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
@Override
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
return (Integer) proxy.invokeOperation("sendMessagesToDeadLetterAddress", filterStr);
return (Integer) proxy.invokeOperation(Integer.class, "sendMessagesToDeadLetterAddress", filterStr);
}
@Override