ARTEMIS-374 support schedule messages on LVQ
This commit is contained in:
parent
cd04b49f89
commit
1eb631e2b8
artemis-server/src
main/java/org/apache/activemq/artemis/core/server
test/java/org/apache/activemq/artemis/core/server/impl
tests
integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server
timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl
unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core
|
@ -68,9 +68,9 @@ public interface Queue extends Bindable {
|
|||
|
||||
void addTail(MessageReference ref, boolean direct);
|
||||
|
||||
void addHead(MessageReference ref);
|
||||
void addHead(MessageReference ref, boolean scheduling);
|
||||
|
||||
void addHead(final List<MessageReference> refs);
|
||||
void addHead(final List<MessageReference> refs, boolean scheduling);
|
||||
|
||||
void acknowledge(MessageReference ref) throws Exception;
|
||||
|
||||
|
|
|
@ -66,6 +66,10 @@ public class LastValueQueue extends QueueImpl {
|
|||
|
||||
@Override
|
||||
public synchronized void addTail(final MessageReference ref, final boolean direct) {
|
||||
if (scheduleIfPossible(ref)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
|
||||
|
||||
if (prop != null) {
|
||||
|
@ -74,18 +78,7 @@ public class LastValueQueue extends QueueImpl {
|
|||
if (hr != null) {
|
||||
// We need to overwrite the old ref with the new one and ack the old one
|
||||
|
||||
MessageReference oldRef = hr.getReference();
|
||||
|
||||
referenceHandled();
|
||||
|
||||
try {
|
||||
oldRef.acknowledge();
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
|
||||
}
|
||||
|
||||
hr.setReference(ref);
|
||||
replaceLVQMessage(ref, hr);
|
||||
|
||||
}
|
||||
else {
|
||||
|
@ -102,35 +95,60 @@ public class LastValueQueue extends QueueImpl {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void addHead(final MessageReference ref) {
|
||||
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
|
||||
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
|
||||
|
||||
if (prop != null) {
|
||||
HolderReference hr = map.get(prop);
|
||||
|
||||
if (hr != null) {
|
||||
// We keep the current ref and ack the one we are returning
|
||||
if (scheduling) {
|
||||
// We need to overwrite the old ref with the new one and ack the old one
|
||||
|
||||
super.referenceHandled();
|
||||
|
||||
try {
|
||||
super.acknowledge(ref);
|
||||
replaceLVQMessage(ref, hr);
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
|
||||
else {
|
||||
// We keep the current ref and ack the one we are returning
|
||||
|
||||
super.referenceHandled();
|
||||
|
||||
try {
|
||||
super.acknowledge(ref);
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
map.put(prop, (HolderReference) ref);
|
||||
hr = new HolderReference(prop, ref);
|
||||
|
||||
super.addHead(ref);
|
||||
map.put(prop, hr);
|
||||
|
||||
super.addHead(hr, scheduling);
|
||||
}
|
||||
}
|
||||
else {
|
||||
super.addHead(ref);
|
||||
super.addHead(ref, scheduling);
|
||||
}
|
||||
}
|
||||
|
||||
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
|
||||
MessageReference oldRef = hr.getReference();
|
||||
|
||||
referenceHandled();
|
||||
|
||||
try {
|
||||
oldRef.acknowledge();
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
|
||||
}
|
||||
|
||||
hr.setReference(ref);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void refRemoved(MessageReference ref) {
|
||||
synchronized (this) {
|
||||
|
|
|
@ -483,9 +483,9 @@ public class QueueImpl implements Queue {
|
|||
|
||||
/* Called when a message is cancelled back into the queue */
|
||||
@Override
|
||||
public synchronized void addHead(final MessageReference ref) {
|
||||
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
|
||||
flushDeliveriesInTransit();
|
||||
if (scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
|
||||
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -496,10 +496,10 @@ public class QueueImpl implements Queue {
|
|||
|
||||
/* Called when a message is cancelled back into the queue */
|
||||
@Override
|
||||
public synchronized void addHead(final List<MessageReference> refs) {
|
||||
public synchronized void addHead(final List<MessageReference> refs, boolean scheduling) {
|
||||
flushDeliveriesInTransit();
|
||||
for (MessageReference ref : refs) {
|
||||
addHead(ref);
|
||||
addHead(ref, scheduling);
|
||||
}
|
||||
|
||||
resetAllIterators();
|
||||
|
@ -526,11 +526,7 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public void addTail(final MessageReference ref, final boolean direct) {
|
||||
if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
|
||||
synchronized (this) {
|
||||
messagesAdded++;
|
||||
}
|
||||
|
||||
if (scheduleIfPossible(ref)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -572,6 +568,17 @@ public class QueueImpl implements Queue {
|
|||
deliverAsync();
|
||||
}
|
||||
|
||||
protected boolean scheduleIfPossible(MessageReference ref) {
|
||||
if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
|
||||
synchronized (this) {
|
||||
messagesAdded++;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This will wait for any pending deliveries to finish
|
||||
*/
|
||||
|
@ -1110,7 +1117,7 @@ public class QueueImpl implements Queue {
|
|||
ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
|
||||
ref.setScheduledDeliveryTime(0);
|
||||
}
|
||||
this.addHead(scheduledMessages);
|
||||
this.addHead(scheduledMessages, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2509,7 +2516,7 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
|
||||
// The message failed to be delivered, hence we try again
|
||||
addHead(reference);
|
||||
addHead(reference, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2634,7 +2641,7 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
|
||||
void postRollback(final LinkedList<MessageReference> refs) {
|
||||
addHead(refs);
|
||||
addHead(refs, false);
|
||||
}
|
||||
|
||||
private long calculateRedeliveryDelay(final AddressSettings addressSettings, final int deliveryCount) {
|
||||
|
|
|
@ -235,7 +235,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
|
|||
if (trace) {
|
||||
ActiveMQServerLogger.LOGGER.trace("Delivering " + list.size() + " elements on list to queue " + queue);
|
||||
}
|
||||
queue.addHead(list);
|
||||
queue.addHead(list, true);
|
||||
}
|
||||
|
||||
// Just to speed up GC
|
||||
|
|
|
@ -939,12 +939,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addHead(MessageReference ref) {
|
||||
public void addHead(MessageReference ref, boolean scheduling) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHead(List<MessageReference> refs) {
|
||||
public void addHead(List<MessageReference> refs, boolean scheduling) {
|
||||
for (MessageReference ref : refs) {
|
||||
addFirst(ref);
|
||||
}
|
||||
|
|
|
@ -504,6 +504,43 @@ public class LVQTest extends ActiveMQTestBase {
|
|||
assertEquals(0, queue.getDeliveringCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScheduledMessages() throws Exception {
|
||||
final long DELAY_TIME = 5000;
|
||||
final int MESSAGE_COUNT = 5;
|
||||
Queue queue = server.locateQueue(qName1);
|
||||
ClientProducer producer = clientSession.createProducer(address);
|
||||
ClientConsumer consumer = clientSession.createConsumer(qName1);
|
||||
SimpleString rh = new SimpleString("SMID1");
|
||||
long timeSent = 0;
|
||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||
ClientMessage m = createTextMessage(clientSession, "m" + i);
|
||||
m.setDurable(true);
|
||||
m.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
|
||||
timeSent = System.currentTimeMillis();
|
||||
m.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeSent + DELAY_TIME);
|
||||
producer.send(m);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// allow schedules to elapse so the messages will be delivered to the queue
|
||||
long start = System.currentTimeMillis();
|
||||
while (queue.getScheduledCount() > 0 && System.currentTimeMillis() - start <= DELAY_TIME) {
|
||||
Thread.sleep(50);
|
||||
}
|
||||
|
||||
assertTrue(queue.getScheduledCount() == 0);
|
||||
|
||||
clientSession.start();
|
||||
ClientMessage m = consumer.receive(DELAY_TIME);
|
||||
assertNotNull(m);
|
||||
long actualDelay = System.currentTimeMillis() - timeSent + 50;
|
||||
assertTrue(actualDelay >= DELAY_TIME);
|
||||
m.acknowledge();
|
||||
assertEquals(m.getBodyBuffer().readString(), "m" + (MESSAGE_COUNT - 1));
|
||||
assertEquals(0, queue.getScheduledCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleAcksPersistedCorrectly2() throws Exception {
|
||||
|
||||
|
|
|
@ -236,7 +236,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
MessageReference messageReference = generateReference(queue, 1);
|
||||
queue.addConsumer(consumer);
|
||||
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
|
||||
queue.addHead(messageReference);
|
||||
queue.addHead(messageReference, false);
|
||||
|
||||
boolean gotLatch = countDownLatch.await(3000, TimeUnit.MILLISECONDS);
|
||||
Assert.assertTrue(gotLatch);
|
||||
|
|
|
@ -102,13 +102,13 @@ public class FakeQueue implements Queue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addHead(MessageReference ref) {
|
||||
public void addHead(MessageReference ref, boolean scheduling) {
|
||||
// no-op
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHead(List<MessageReference> ref) {
|
||||
public void addHead(List<MessageReference> ref, boolean scheduling) {
|
||||
// no-op
|
||||
|
||||
}
|
||||
|
|
|
@ -376,7 +376,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
|
||||
refs2.addFirst(ref);
|
||||
|
||||
queue.addHead(ref);
|
||||
queue.addHead(ref, false);
|
||||
}
|
||||
|
||||
List<MessageReference> refs3 = new ArrayList<>();
|
||||
|
@ -1039,9 +1039,9 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
MessageReference messageReference = generateReference(queue, 1);
|
||||
MessageReference messageReference2 = generateReference(queue, 2);
|
||||
MessageReference messageReference3 = generateReference(queue, 3);
|
||||
queue.addHead(messageReference);
|
||||
queue.addHead(messageReference, false);
|
||||
queue.addTail(messageReference2);
|
||||
queue.addHead(messageReference3);
|
||||
queue.addHead(messageReference3, false);
|
||||
|
||||
Assert.assertEquals(0, consumer.getReferences().size());
|
||||
queue.addConsumer(consumer);
|
||||
|
@ -1071,9 +1071,9 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
MessageReference messageReference = generateReference(queue, 1);
|
||||
MessageReference messageReference2 = generateReference(queue, 2);
|
||||
MessageReference messageReference3 = generateReference(queue, 3);
|
||||
queue.addHead(messageReference);
|
||||
queue.addHead(messageReference2);
|
||||
queue.addHead(messageReference3);
|
||||
queue.addHead(messageReference, false);
|
||||
queue.addHead(messageReference2, false);
|
||||
queue.addHead(messageReference3, false);
|
||||
Assert.assertEquals(queue.getReference(2), messageReference2);
|
||||
|
||||
}
|
||||
|
@ -1084,9 +1084,9 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
MessageReference messageReference = generateReference(queue, 1);
|
||||
MessageReference messageReference2 = generateReference(queue, 2);
|
||||
MessageReference messageReference3 = generateReference(queue, 3);
|
||||
queue.addHead(messageReference);
|
||||
queue.addHead(messageReference2);
|
||||
queue.addHead(messageReference3);
|
||||
queue.addHead(messageReference, false);
|
||||
queue.addHead(messageReference2, false);
|
||||
queue.addHead(messageReference3, false);
|
||||
Assert.assertNull(queue.getReference(5));
|
||||
|
||||
}
|
||||
|
@ -1231,7 +1231,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
|||
@Override
|
||||
public void run() {
|
||||
if (first) {
|
||||
queue.addHead(messageReference);
|
||||
queue.addHead(messageReference, false);
|
||||
}
|
||||
else {
|
||||
queue.addTail(messageReference);
|
||||
|
|
Loading…
Reference in New Issue