This commit is contained in:
Clebert Suconic 2019-04-10 16:00:51 -04:00
commit 1917463ead
3 changed files with 37 additions and 1 deletions

View File

@ -119,6 +119,10 @@ public class LastValueQueue extends QueueImpl {
@Override
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
SimpleString lastValueProp = ref.getLastValueProperty();

View File

@ -186,7 +186,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
protected final ScheduledDeliveryHandler scheduledDeliveryHandler;
private AtomicLong messagesAdded = new AtomicLong(0);

View File

@ -95,6 +95,38 @@ public class LVQTest extends ActiveMQTestBase {
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
}
@Test
public void testMultipleRollback() throws Exception {
AddressSettings qs = new AddressSettings();
qs.setDefaultLastValueQueue(true);
qs.setRedeliveryDelay(1);
server.getAddressSettingsRepository().addMatch(address.toString(), qs);
ClientProducer producer = clientSessionTxReceives.createProducer(address);
ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
SimpleString messageId1 = new SimpleString("SMID1");
ClientMessage m1 = createTextMessage(clientSession, "m1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, messageId1);
producer.send(m1);
clientSessionTxReceives.start();
for (int i = 0; i < 10; i++) {
System.out.println("#Deliver " + i);
ClientMessage m = consumer.receive(5000);
Assert.assertNotNull(m);
m.acknowledge();
clientSessionTxReceives.rollback();
}
m1 = createTextMessage(clientSession, "m1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, messageId1);
producer.send(m1);
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
Assert.assertNull(consumer.receiveImmediate());
clientSessionTxReceives.commit();
}
@Test
public void testFirstMessageReceivedButAckedAfter() throws Exception {
ClientProducer producer = clientSession.createProducer(address);