ARTEMIS-2572 The retryMessages remove all paged messages

Add a paged message to the tail, when the QueueIterateAction doesn't handle it, to avoid removing unhandled paged message. Move the refRemoved calls from the QueueIterateActions to the iterQueue to fix the queue stats.
This commit is contained in:
brusdev 2019-12-10 12:55:58 +01:00
parent f43b9c395f
commit b666cb495b
2 changed files with 109 additions and 25 deletions

View File

@ -1929,16 +1929,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return new QueueIterateAction() { return new QueueIterateAction() {
@Override @Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
return actMessage(tx, ref, true);
}
@Override
public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
incDelivering(ref); incDelivering(ref);
acknowledge(tx, ref, ackReason, null); acknowledge(tx, ref, ackReason, null);
if (fromMessageReferences) {
refRemoved(ref);
}
return true; return true;
} }
}; };
@ -1982,6 +1974,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (filter1 == null || filter1.match(ref.getMessage())) { if (filter1 == null || filter1.match(ref.getMessage())) {
if (messageAction.actMessage(tx, ref)) { if (messageAction.actMessage(tx, ref)) {
iter.remove(); iter.remove();
refRemoved(ref);
} }
txCount++; txCount++;
count++; count++;
@ -1998,7 +1991,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1); List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
for (MessageReference messageReference : cancelled) { for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference, false); messageAction.actMessage(tx, messageReference);
count++; count++;
txCount++; txCount++;
} }
@ -2019,7 +2012,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (filter1 == null || filter1.match(reference.getMessage())) { if (filter1 == null || filter1.match(reference.getMessage())) {
count++; count++;
txCount++; txCount++;
messageAction.actMessage(tx, reference, false); if (!messageAction.actMessage(tx, reference)) {
addTail(reference, false);
}
} else { } else {
addTail(reference, false); addTail(reference, false);
} }
@ -2394,8 +2389,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (!ignored) { if (!ignored) {
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null); move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null);
refRemoved(ref);
//move(toAddress, tx, ref, false, rejectDuplicates);
} }
return true; return true;
@ -2459,7 +2452,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else { } else {
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false); move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
} }
refRemoved(ref);
return true; return true;
} }
@ -3908,18 +3901,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @throws Exception * @throws Exception
*/ */
public abstract boolean actMessage(Transaction tx, MessageReference ref) throws Exception; public abstract boolean actMessage(Transaction tx, MessageReference ref) throws Exception;
/**
*
* @param tx the transaction which the message action should participate in
* @param ref the message reference which the action should act upon
* @param fromMessageReferences false if the queue's stats should *not* be updated (e.g. paged or scheduled refs)
* @return true if the action should result in the removal of the message from the queue; false otherwise
* @throws Exception
*/
public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
return actMessage(tx, ref);
}
} }
/* For external use we need to use a synchronized version since the list is not thread safe */ /* For external use we need to use a synchronized version since the list is not thread safe */

View File

@ -578,6 +578,109 @@ public class PagingTest extends ActiveMQTestBase {
Assert.assertEquals(numberOfMessages * 2, removedMessages); Assert.assertEquals(numberOfMessages * 2, removedMessages);
} }
@Test
public void testQueueRetryMessages() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
final int numberOfMessages = 500;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, new SimpleString(PagingTest.ADDRESS + "Queue"), null, true);
session.createQueue(PagingTest.ADDRESS + "Original", PagingTest.ADDRESS + "QueueOriginal", null, true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body);
message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, PagingTest.ADDRESS + "Original");
message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, PagingTest.ADDRESS + "QueueOriginal");
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
producer.close();
session.close();
session = sf.createSession(false, false, false);
producer = session.createProducer(PagingTest.ADDRESS);
producer.send(session.createMessage(true));
session.rollback();
producer.close();
session.close();
session = sf.createSession(false, false, false);
producer = session.createProducer(PagingTest.ADDRESS);
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
message = session.createMessage(true);
message.getBodyBuffer().writeBytes(body);
message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, PagingTest.ADDRESS + "Original");
message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, PagingTest.ADDRESS + "QueueOriginal");
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
producer.close();
session.close();
Queue queue = server.locateQueue(new SimpleString(PagingTest.ADDRESS + "Queue"));
Queue originalQueue = server.locateQueue(new SimpleString(PagingTest.ADDRESS + "QueueOriginal"));
Wait.assertEquals(numberOfMessages * 4, queue::getMessageCount);
Wait.assertEquals(0, originalQueue::getMessageCount);
QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS + "Queue");
QueueControl originalQueueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS + "QueueOriginal");
queueControl.retryMessages();
Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount, 5000);
Wait.assertEquals(numberOfMessages * 2, originalQueue::getMessageCount, 5000);
}
@Test @Test
public void testEmptyAddress() throws Exception { public void testEmptyAddress() throws Exception {
if (storeType == StoreConfiguration.StoreType.FILE) { if (storeType == StoreConfiguration.StoreType.FILE) {