ARTEMIS-3024 Expiry or DLQ Retry not working with AMQP

This commit is contained in:
Clebert Suconic 2020-12-08 14:02:19 -05:00
parent 7fafe179f1
commit 825341734a
4 changed files with 73 additions and 1 deletions

View File

@ -524,7 +524,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
} }
protected Object getMessageAnnotation(String annotation) { protected Object getMessageAnnotation(String annotation) {
return getMessageAnnotation(Symbol.getSymbol(annotation)); Object messageAnnotation = getMessageAnnotation(Symbol.getSymbol(AMQPMessageSupport.toAnnotationName(annotation)));
if (messageAnnotation == null) {
messageAnnotation = getMessageAnnotation(Symbol.getSymbol(annotation));
}
return messageAnnotation;
} }
protected Object getMessageAnnotation(Symbol annotation) { protected Object getMessageAnnotation(Symbol annotation) {

View File

@ -361,6 +361,10 @@ public interface Queue extends Bindable,CriticalComponent {
int retryMessages(Filter filter) throws Exception; int retryMessages(Filter filter) throws Exception;
default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
return retryMessages(filter);
}
void addRedistributor(long delay); void addRedistributor(long delay);
void cancelRedistributor() throws Exception; void cancelRedistributor() throws Exception;

View File

@ -2140,6 +2140,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueIterateAction messageAction) throws Exception { QueueIterateAction messageAction) throws Exception {
int count = 0; int count = 0;
int txCount = 0; int txCount = 0;
Integer expectedHits = messageAction.expectedHits();
// This is to avoid scheduling depaging while iterQueue is happening // This is to avoid scheduling depaging while iterQueue is happening
// this should minimize the use of the paged executor. // this should minimize the use of the paged executor.
depagePending = true; depagePending = true;
@ -2170,6 +2171,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
txCount++; txCount++;
count++; count++;
if (expectedHits != null && count >= expectedHits.intValue()) {
break;
}
} }
} }
@ -2611,10 +2615,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public int retryMessages(Filter filter) throws Exception { public int retryMessages(Filter filter) throws Exception {
return retryMessages(filter, null);
}
@Override
public int retryMessages(Filter filter, Integer expectedHits) throws Exception {
final HashMap<String, Long> queues = new HashMap<>(); final HashMap<String, Long> queues = new HashMap<>();
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public Integer expectedHits() {
return expectedHits;
}
@Override @Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@ -4164,6 +4178,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/ */
abstract class QueueIterateAction { abstract class QueueIterateAction {
public Integer expectedHits() {
return null;
}
/** /**
* *
* @param tx the transaction which the message action should participate in * @param tx the transaction which the message action should participate in

View File

@ -128,6 +128,52 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Test(timeout = 60000)
public void testRetryExpiry() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(1);
message.setText("Test-Message");
message.setDurable(true);
message.setApplicationProperty("key1", "Value1");
sender.send(message);
message = new AmqpMessage();
message.setTimeToLive(1);
message.setBytes(new byte[500 * 1024]);
sender.send(message);
sender.close();
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
Wait.assertEquals(2, dlqView::getMessageCount);
Assert.assertEquals(2, dlqView.retryMessages(null));
Wait.assertEquals(0, dlqView::getMessageCount);
Wait.assertEquals(2, queueView::getMessageCount);
AmqpReceiver receiver = session.createReceiver(getQueueName());
// Now try and get the message
receiver.flow(2);
for (int i = 0; i < 2; i++) {
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
}
connection.close();
Wait.assertEquals(0, queueView::getMessageCount);
Wait.assertEquals(0, dlqView::getMessageCount);
}
/** This test is validating a broker feature where the message copy through the DLQ will receive an annotation. /** This test is validating a broker feature where the message copy through the DLQ will receive an annotation.
* It is also testing filter on that annotation. */ * It is also testing filter on that annotation. */
@Test(timeout = 60000) @Test(timeout = 60000)