This commit is contained in:
Clebert Suconic 2020-12-08 14:43:14 -05:00
commit caf314ec82
4 changed files with 73 additions and 1 deletions

View File

@ -524,7 +524,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
}
protected Object getMessageAnnotation(String annotation) {
return getMessageAnnotation(Symbol.getSymbol(annotation));
Object messageAnnotation = getMessageAnnotation(Symbol.getSymbol(AMQPMessageSupport.toAnnotationName(annotation)));
if (messageAnnotation == null) {
messageAnnotation = getMessageAnnotation(Symbol.getSymbol(annotation));
}
return messageAnnotation;
}
protected Object getMessageAnnotation(Symbol annotation) {

View File

@ -361,6 +361,10 @@ public interface Queue extends Bindable,CriticalComponent {
int retryMessages(Filter filter) throws Exception;
default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
return retryMessages(filter);
}
void addRedistributor(long delay);
void cancelRedistributor() throws Exception;

View File

@ -2140,6 +2140,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueIterateAction messageAction) throws Exception {
int count = 0;
int txCount = 0;
Integer expectedHits = messageAction.expectedHits();
// This is to avoid scheduling depaging while iterQueue is happening
// this should minimize the use of the paged executor.
depagePending = true;
@ -2170,6 +2171,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
txCount++;
count++;
if (expectedHits != null && count >= expectedHits.intValue()) {
break;
}
}
}
@ -2611,10 +2615,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public int retryMessages(Filter filter) throws Exception {
return retryMessages(filter, null);
}
@Override
public int retryMessages(Filter filter, Integer expectedHits) throws Exception {
final HashMap<String, Long> queues = new HashMap<>();
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public Integer expectedHits() {
return expectedHits;
}
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@ -4164,6 +4178,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/
abstract class QueueIterateAction {
public Integer expectedHits() {
return null;
}
/**
*
* @param tx the transaction which the message action should participate in

View File

@ -128,6 +128,52 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testRetryExpiry() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(1);
message.setText("Test-Message");
message.setDurable(true);
message.setApplicationProperty("key1", "Value1");
sender.send(message);
message = new AmqpMessage();
message.setTimeToLive(1);
message.setBytes(new byte[500 * 1024]);
sender.send(message);
sender.close();
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
Wait.assertEquals(2, dlqView::getMessageCount);
Assert.assertEquals(2, dlqView.retryMessages(null));
Wait.assertEquals(0, dlqView::getMessageCount);
Wait.assertEquals(2, queueView::getMessageCount);
AmqpReceiver receiver = session.createReceiver(getQueueName());
// Now try and get the message
receiver.flow(2);
for (int i = 0; i < 2; i++) {
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
}
connection.close();
Wait.assertEquals(0, queueView::getMessageCount);
Wait.assertEquals(0, dlqView::getMessageCount);
}
/** This test is validating a broker feature where the message copy through the DLQ will receive an annotation.
* It is also testing filter on that annotation. */
@Test(timeout = 60000)