diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 81bd5653dc..c92325a0bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -68,9 +68,9 @@ public interface Queue extends Bindable { void addTail(MessageReference ref, boolean direct); - void addHead(MessageReference ref); + void addHead(MessageReference ref, boolean scheduling); - void addHead(final List refs); + void addHead(final List refs, boolean scheduling); void acknowledge(MessageReference ref) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 5420688e18..9feb60eb48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -66,6 +66,10 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addTail(final MessageReference ref, final boolean direct) { + if (scheduleIfPossible(ref)) { + return; + } + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); if (prop != null) { @@ -74,18 +78,7 @@ public class LastValueQueue extends QueueImpl { if (hr != null) { // We need to overwrite the old ref with the new one and ack the old one - MessageReference oldRef = hr.getReference(); - - referenceHandled(); - - try { - oldRef.acknowledge(); - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); - } - - hr.setReference(ref); + replaceLVQMessage(ref, hr); } else { @@ -102,35 +95,60 @@ public class LastValueQueue extends QueueImpl { } @Override - public synchronized void addHead(final MessageReference ref) { + public synchronized void addHead(final MessageReference ref, boolean scheduling) { SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); if (prop != null) { HolderReference hr = map.get(prop); if (hr != null) { - // We keep the current ref and ack the one we are returning + if (scheduling) { + // We need to overwrite the old ref with the new one and ack the old one - super.referenceHandled(); - - try { - super.acknowledge(ref); + replaceLVQMessage(ref, hr); } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + else { + // We keep the current ref and ack the one we are returning + + super.referenceHandled(); + + try { + super.acknowledge(ref); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + } } } else { - map.put(prop, (HolderReference) ref); + hr = new HolderReference(prop, ref); - super.addHead(ref); + map.put(prop, hr); + + super.addHead(hr, scheduling); } } else { - super.addHead(ref); + super.addHead(ref, scheduling); } } + private void replaceLVQMessage(MessageReference ref, HolderReference hr) { + MessageReference oldRef = hr.getReference(); + + referenceHandled(); + + try { + oldRef.acknowledge(); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + } + + hr.setReference(ref); + } + + @Override protected void refRemoved(MessageReference ref) { synchronized (this) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 866ea8529a..e555ee2326 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -483,9 +483,9 @@ public class QueueImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override - public synchronized void addHead(final MessageReference ref) { + public synchronized void addHead(final MessageReference ref, boolean scheduling) { flushDeliveriesInTransit(); - if (scheduledDeliveryHandler.checkAndSchedule(ref, false)) { + if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { return; } @@ -496,10 +496,10 @@ public class QueueImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override - public synchronized void addHead(final List refs) { + public synchronized void addHead(final List refs, boolean scheduling) { flushDeliveriesInTransit(); for (MessageReference ref : refs) { - addHead(ref); + addHead(ref, scheduling); } resetAllIterators(); @@ -526,11 +526,7 @@ public class QueueImpl implements Queue { @Override public void addTail(final MessageReference ref, final boolean direct) { - if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) { - synchronized (this) { - messagesAdded++; - } - + if (scheduleIfPossible(ref)) { return; } @@ -572,6 +568,17 @@ public class QueueImpl implements Queue { deliverAsync(); } + protected boolean scheduleIfPossible(MessageReference ref) { + if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) { + synchronized (this) { + messagesAdded++; + } + + return true; + } + return false; + } + /** * This will wait for any pending deliveries to finish */ @@ -1110,7 +1117,7 @@ public class QueueImpl implements Queue { ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime()); ref.setScheduledDeliveryTime(0); } - this.addHead(scheduledMessages); + this.addHead(scheduledMessages, true); } } @@ -2509,7 +2516,7 @@ public class QueueImpl implements Queue { } // The message failed to be delivered, hence we try again - addHead(reference); + addHead(reference, false); } } } @@ -2634,7 +2641,7 @@ public class QueueImpl implements Queue { } void postRollback(final LinkedList refs) { - addHead(refs); + addHead(refs, false); } private long calculateRedeliveryDelay(final AddressSettings addressSettings, final int deliveryCount) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index 6b5e2e2705..1fda4d82c9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -235,7 +235,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { if (trace) { ActiveMQServerLogger.LOGGER.trace("Delivering " + list.size() + " elements on list to queue " + queue); } - queue.addHead(list); + queue.addHead(list, true); } // Just to speed up GC diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index c54881d26c..22ec4384ba 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -939,12 +939,12 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void addHead(MessageReference ref) { + public void addHead(MessageReference ref, boolean scheduling) { } @Override - public void addHead(List refs) { + public void addHead(List refs, boolean scheduling) { for (MessageReference ref : refs) { addFirst(ref); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index 24f1faa3bc..8eabb2867b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -504,6 +504,43 @@ public class LVQTest extends ActiveMQTestBase { assertEquals(0, queue.getDeliveringCount()); } + @Test + public void testScheduledMessages() throws Exception { + final long DELAY_TIME = 5000; + final int MESSAGE_COUNT = 5; + Queue queue = server.locateQueue(qName1); + ClientProducer producer = clientSession.createProducer(address); + ClientConsumer consumer = clientSession.createConsumer(qName1); + SimpleString rh = new SimpleString("SMID1"); + long timeSent = 0; + for (int i = 0; i < MESSAGE_COUNT; i++) { + ClientMessage m = createTextMessage(clientSession, "m" + i); + m.setDurable(true); + m.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + timeSent = System.currentTimeMillis(); + m.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeSent + DELAY_TIME); + producer.send(m); + Thread.sleep(100); + } + + // allow schedules to elapse so the messages will be delivered to the queue + long start = System.currentTimeMillis(); + while (queue.getScheduledCount() > 0 && System.currentTimeMillis() - start <= DELAY_TIME) { + Thread.sleep(50); + } + + assertTrue(queue.getScheduledCount() == 0); + + clientSession.start(); + ClientMessage m = consumer.receive(DELAY_TIME); + assertNotNull(m); + long actualDelay = System.currentTimeMillis() - timeSent + 50; + assertTrue(actualDelay >= DELAY_TIME); + m.acknowledge(); + assertEquals(m.getBodyBuffer().readString(), "m" + (MESSAGE_COUNT - 1)); + assertEquals(0, queue.getScheduledCount()); + } + @Test public void testMultipleAcksPersistedCorrectly2() throws Exception { diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java index 8fb9b9beab..4205294afc 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java @@ -236,7 +236,7 @@ public class QueueImplTest extends ActiveMQTestBase { MessageReference messageReference = generateReference(queue, 1); queue.addConsumer(consumer); messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); - queue.addHead(messageReference); + queue.addHead(messageReference, false); boolean gotLatch = countDownLatch.await(3000, TimeUnit.MILLISECONDS); Assert.assertTrue(gotLatch); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index d53a51e670..99d01e6cfe 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -102,13 +102,13 @@ public class FakeQueue implements Queue { } @Override - public void addHead(MessageReference ref) { + public void addHead(MessageReference ref, boolean scheduling) { // no-op } @Override - public void addHead(List ref) { + public void addHead(List ref, boolean scheduling) { // no-op } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index f7ad00dae6..b1432e2776 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -376,7 +376,7 @@ public class QueueImplTest extends ActiveMQTestBase { refs2.addFirst(ref); - queue.addHead(ref); + queue.addHead(ref, false); } List refs3 = new ArrayList<>(); @@ -1039,9 +1039,9 @@ public class QueueImplTest extends ActiveMQTestBase { MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference3 = generateReference(queue, 3); - queue.addHead(messageReference); + queue.addHead(messageReference, false); queue.addTail(messageReference2); - queue.addHead(messageReference3); + queue.addHead(messageReference3, false); Assert.assertEquals(0, consumer.getReferences().size()); queue.addConsumer(consumer); @@ -1071,9 +1071,9 @@ public class QueueImplTest extends ActiveMQTestBase { MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference3 = generateReference(queue, 3); - queue.addHead(messageReference); - queue.addHead(messageReference2); - queue.addHead(messageReference3); + queue.addHead(messageReference, false); + queue.addHead(messageReference2, false); + queue.addHead(messageReference3, false); Assert.assertEquals(queue.getReference(2), messageReference2); } @@ -1084,9 +1084,9 @@ public class QueueImplTest extends ActiveMQTestBase { MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference2 = generateReference(queue, 2); MessageReference messageReference3 = generateReference(queue, 3); - queue.addHead(messageReference); - queue.addHead(messageReference2); - queue.addHead(messageReference3); + queue.addHead(messageReference, false); + queue.addHead(messageReference2, false); + queue.addHead(messageReference3, false); Assert.assertNull(queue.getReference(5)); } @@ -1231,7 +1231,7 @@ public class QueueImplTest extends ActiveMQTestBase { @Override public void run() { if (first) { - queue.addHead(messageReference); + queue.addHead(messageReference, false); } else { queue.addTail(messageReference);