This commit is contained in:
Clebert Suconic 2019-12-03 14:24:02 -05:00
commit 09e3b7b6d5
3 changed files with 91 additions and 34 deletions

View File

@ -220,9 +220,9 @@ public class LastValueQueue extends QueueImpl {
QueueIterateAction queueIterateAction = super.createDeleteMatchingAction(ackReason);
return new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
removeIfCurrent(ref);
queueIterateAction.actMessage(tx, ref);
return queueIterateAction.actMessage(tx, ref);
}
};
}

View File

@ -1928,17 +1928,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueIterateAction createDeleteMatchingAction(AckReason ackReason) {
return new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
actMessage(tx, ref, true);
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
return actMessage(tx, ref, true);
}
@Override
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
incDelivering(ref);
acknowledge(tx, ref, ackReason, null);
if (fromMessageReferences) {
refRemoved(ref);
}
return true;
}
};
}
@ -1956,23 +1957,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private int iterQueue(final int flushLimit,
final Filter filter1,
QueueIterateAction messageAction) throws Exception {
return iterQueue(flushLimit, filter1, messageAction, true);
}
/**
* This is a generic method for any method interacting on the Queue to move or delete messages
* Instead of duplicate the feature we created an abstract class where you pass the logic for
* each message.
*
* @param filter1
* @param messageAction
* @return
* @throws Exception
*/
private int iterQueue(final int flushLimit,
final Filter filter1,
QueueIterateAction messageAction,
final boolean remove) throws Exception {
int count = 0;
int txCount = 0;
@ -1996,8 +1980,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (filter1 == null || filter1.match(ref.getMessage())) {
messageAction.actMessage(tx, ref);
if (remove) {
if (messageAction.actMessage(tx, ref)) {
iter.remove();
}
txCount++;
@ -2393,7 +2376,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return iterQueue(flushLimit, filter, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
boolean ignored = false;
incDelivering(ref);
@ -2414,6 +2397,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refRemoved(ref);
//move(toAddress, tx, ref, false, rejectDuplicates);
}
return true;
}
});
}
@ -2421,8 +2406,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
moveBetweenSnFQueues(queueSuffix, tx, ref);
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
return moveBetweenSnFQueues(queueSuffix, tx, ref);
}
});
}
@ -2430,13 +2415,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
RoutingContext routingContext = new RoutingContextImpl(tx);
routingContext.setAddress(server.locateQueue(queueName).getAddress());
server.getPostOffice().getBinding(queueName).route(ref.getMessage(), routingContext);
postOffice.processRoute(ref.getMessage(), routingContext, false);
return false;
}
}, false);
});
}
@Override
@ -2446,7 +2432,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
@ -2474,7 +2460,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
}
refRemoved(ref);
return true;
}
return false;
}
});
@ -3137,7 +3126,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
private void moveBetweenSnFQueues(final SimpleString queueSuffix,
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
final Transaction tx,
final MessageReference ref) throws Exception {
Message copyMessage = makeCopy(ref, false, false);
@ -3199,6 +3188,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliverAsync();
}
});
return true;
}
private Pair<String, Binding> locateTargetBinding(SimpleString queueSuffix, Message copyMessage, long oldQueueID) {
@ -3909,10 +3900,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/
abstract class QueueIterateAction {
public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception;
/**
*
* @param tx the transaction which the message action should participate in
* @param ref the message reference which the action should act upon
* @return true if the action should result in the removal of the message from the queue; false otherwise
* @throws Exception
*/
public abstract boolean actMessage(Transaction tx, MessageReference ref) throws Exception;
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
actMessage(tx, ref);
/**
*
* @param tx the transaction which the message action should participate in
* @param ref the message reference which the action should act upon
* @param fromMessageReferences false if the queue's stats should *not* be updated (e.g. paged or scheduled refs)
* @return true if the action should result in the removal of the message from the queue; false otherwise
* @throws Exception
*/
public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
return actMessage(tx, ref);
}
}

View File

@ -1073,6 +1073,57 @@ public class QueueControlTest extends ManagementTestBase {
clientConsumer.close();
}
@Test
public void testRetryMessageWithoutDLQ() throws Exception {
final SimpleString qName = new SimpleString("q1");
final SimpleString qName2 = new SimpleString("q2");
final SimpleString adName = new SimpleString("ad1");
final SimpleString adName2 = new SimpleString("ad2");
final String sampleText = "Put me on DLQ";
session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
session.createQueue(adName2, RoutingType.MULTICAST, qName2, null, durable);
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
producer.send(createTextMessage(session, sampleText));
ClientMessage m = createTextMessage(session, sampleText);
m.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, adName2);
m.putStringProperty(Message.HDR_ORIGINAL_QUEUE, qName2);
producer.send(m);
session.start();
QueueControl queueControl = createManagementControl(adName, qName);
assertMessageMetrics(queueControl, 2, durable);
QueueControl queueControl2 = createManagementControl(adName2, qName2);
assertMessageMetrics(queueControl2, 0, durable);
queueControl.retryMessages();
Wait.assertTrue(() -> getMessageCount(queueControl) == 1, 2000, 100);
assertMessageMetrics(queueControl, 1, durable);
Wait.assertTrue(() -> getMessageCount(queueControl2) == 1, 2000, 100);
assertMessageMetrics(queueControl2, 1, durable);
ClientConsumer clientConsumer = session.createConsumer(qName);
ClientMessage clientMessage = clientConsumer.receive(500);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer = session.createConsumer(qName2);
clientMessage = clientConsumer.receive(500);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer.close();
}
/**
* Test retry - get a diverted message from DLQ and put on original queue.
*/