This closes #362

This commit is contained in:
Clebert Suconic 2016-02-01 13:32:47 -05:00
commit f7148acf52
9 changed files with 115 additions and 53 deletions

View File

@ -68,9 +68,9 @@ public interface Queue extends Bindable {
void addTail(MessageReference ref, boolean direct);
void addHead(MessageReference ref);
void addHead(MessageReference ref, boolean scheduling);
void addHead(final List<MessageReference> refs);
void addHead(final List<MessageReference> refs, boolean scheduling);
void acknowledge(MessageReference ref) throws Exception;

View File

@ -66,6 +66,10 @@ public class LastValueQueue extends QueueImpl {
@Override
public synchronized void addTail(final MessageReference ref, final boolean direct) {
if (scheduleIfPossible(ref)) {
return;
}
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
if (prop != null) {
@ -74,18 +78,7 @@ public class LastValueQueue extends QueueImpl {
if (hr != null) {
// We need to overwrite the old ref with the new one and ack the old one
MessageReference oldRef = hr.getReference();
referenceHandled();
try {
oldRef.acknowledge();
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
}
hr.setReference(ref);
replaceLVQMessage(ref, hr);
}
else {
@ -102,35 +95,60 @@ public class LastValueQueue extends QueueImpl {
}
@Override
public synchronized void addHead(final MessageReference ref) {
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
if (prop != null) {
HolderReference hr = map.get(prop);
if (hr != null) {
// We keep the current ref and ack the one we are returning
if (scheduling) {
// We need to overwrite the old ref with the new one and ack the old one
super.referenceHandled();
try {
super.acknowledge(ref);
replaceLVQMessage(ref, hr);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
else {
// We keep the current ref and ack the one we are returning
super.referenceHandled();
try {
super.acknowledge(ref);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
}
}
}
else {
map.put(prop, (HolderReference) ref);
hr = new HolderReference(prop, ref);
super.addHead(ref);
map.put(prop, hr);
super.addHead(hr, scheduling);
}
}
else {
super.addHead(ref);
super.addHead(ref, scheduling);
}
}
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
MessageReference oldRef = hr.getReference();
referenceHandled();
try {
oldRef.acknowledge();
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
}
hr.setReference(ref);
}
@Override
protected void refRemoved(MessageReference ref) {
synchronized (this) {

View File

@ -483,9 +483,9 @@ public class QueueImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public synchronized void addHead(final MessageReference ref) {
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
flushDeliveriesInTransit();
if (scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
@ -496,10 +496,10 @@ public class QueueImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public synchronized void addHead(final List<MessageReference> refs) {
public synchronized void addHead(final List<MessageReference> refs, boolean scheduling) {
flushDeliveriesInTransit();
for (MessageReference ref : refs) {
addHead(ref);
addHead(ref, scheduling);
}
resetAllIterators();
@ -526,11 +526,7 @@ public class QueueImpl implements Queue {
@Override
public void addTail(final MessageReference ref, final boolean direct) {
if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
synchronized (this) {
messagesAdded++;
}
if (scheduleIfPossible(ref)) {
return;
}
@ -572,6 +568,17 @@ public class QueueImpl implements Queue {
deliverAsync();
}
protected boolean scheduleIfPossible(MessageReference ref) {
if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
synchronized (this) {
messagesAdded++;
}
return true;
}
return false;
}
/**
* This will wait for any pending deliveries to finish
*/
@ -1110,7 +1117,7 @@ public class QueueImpl implements Queue {
ref.getMessage().putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, ref.getScheduledDeliveryTime());
ref.setScheduledDeliveryTime(0);
}
this.addHead(scheduledMessages);
this.addHead(scheduledMessages, true);
}
}
@ -2509,7 +2516,7 @@ public class QueueImpl implements Queue {
}
// The message failed to be delivered, hence we try again
addHead(reference);
addHead(reference, false);
}
}
}
@ -2634,7 +2641,7 @@ public class QueueImpl implements Queue {
}
void postRollback(final LinkedList<MessageReference> refs) {
addHead(refs);
addHead(refs, false);
}
private long calculateRedeliveryDelay(final AddressSettings addressSettings, final int deliveryCount) {

View File

@ -235,7 +235,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
if (trace) {
ActiveMQServerLogger.LOGGER.trace("Delivering " + list.size() + " elements on list to queue " + queue);
}
queue.addHead(list);
queue.addHead(list, true);
}
// Just to speed up GC

View File

@ -939,12 +939,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void addHead(MessageReference ref) {
public void addHead(MessageReference ref, boolean scheduling) {
}
@Override
public void addHead(List<MessageReference> refs) {
public void addHead(List<MessageReference> refs, boolean scheduling) {
for (MessageReference ref : refs) {
addFirst(ref);
}

View File

@ -504,6 +504,43 @@ public class LVQTest extends ActiveMQTestBase {
assertEquals(0, queue.getDeliveringCount());
}
@Test
public void testScheduledMessages() throws Exception {
final long DELAY_TIME = 5000;
final int MESSAGE_COUNT = 5;
Queue queue = server.locateQueue(qName1);
ClientProducer producer = clientSession.createProducer(address);
ClientConsumer consumer = clientSession.createConsumer(qName1);
SimpleString rh = new SimpleString("SMID1");
long timeSent = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
ClientMessage m = createTextMessage(clientSession, "m" + i);
m.setDurable(true);
m.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
timeSent = System.currentTimeMillis();
m.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeSent + DELAY_TIME);
producer.send(m);
Thread.sleep(100);
}
// allow schedules to elapse so the messages will be delivered to the queue
long start = System.currentTimeMillis();
while (queue.getScheduledCount() > 0 && System.currentTimeMillis() - start <= DELAY_TIME) {
Thread.sleep(50);
}
assertTrue(queue.getScheduledCount() == 0);
clientSession.start();
ClientMessage m = consumer.receive(DELAY_TIME);
assertNotNull(m);
long actualDelay = System.currentTimeMillis() - timeSent + 50;
assertTrue(actualDelay >= DELAY_TIME);
m.acknowledge();
assertEquals(m.getBodyBuffer().readString(), "m" + (MESSAGE_COUNT - 1));
assertEquals(0, queue.getScheduledCount());
}
@Test
public void testMultipleAcksPersistedCorrectly2() throws Exception {

View File

@ -236,7 +236,7 @@ public class QueueImplTest extends ActiveMQTestBase {
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
queue.addHead(messageReference);
queue.addHead(messageReference, false);
boolean gotLatch = countDownLatch.await(3000, TimeUnit.MILLISECONDS);
Assert.assertTrue(gotLatch);

View File

@ -102,13 +102,13 @@ public class FakeQueue implements Queue {
}
@Override
public void addHead(MessageReference ref) {
public void addHead(MessageReference ref, boolean scheduling) {
// no-op
}
@Override
public void addHead(List<MessageReference> ref) {
public void addHead(List<MessageReference> ref, boolean scheduling) {
// no-op
}

View File

@ -376,7 +376,7 @@ public class QueueImplTest extends ActiveMQTestBase {
refs2.addFirst(ref);
queue.addHead(ref);
queue.addHead(ref, false);
}
List<MessageReference> refs3 = new ArrayList<>();
@ -1039,9 +1039,9 @@ public class QueueImplTest extends ActiveMQTestBase {
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
queue.addHead(messageReference);
queue.addHead(messageReference, false);
queue.addTail(messageReference2);
queue.addHead(messageReference3);
queue.addHead(messageReference3, false);
Assert.assertEquals(0, consumer.getReferences().size());
queue.addConsumer(consumer);
@ -1071,9 +1071,9 @@ public class QueueImplTest extends ActiveMQTestBase {
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
queue.addHead(messageReference);
queue.addHead(messageReference2);
queue.addHead(messageReference3);
queue.addHead(messageReference, false);
queue.addHead(messageReference2, false);
queue.addHead(messageReference3, false);
Assert.assertEquals(queue.getReference(2), messageReference2);
}
@ -1084,9 +1084,9 @@ public class QueueImplTest extends ActiveMQTestBase {
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
queue.addHead(messageReference);
queue.addHead(messageReference2);
queue.addHead(messageReference3);
queue.addHead(messageReference, false);
queue.addHead(messageReference2, false);
queue.addHead(messageReference3, false);
Assert.assertNull(queue.getReference(5));
}
@ -1231,7 +1231,7 @@ public class QueueImplTest extends ActiveMQTestBase {
@Override
public void run() {
if (first) {
queue.addHead(messageReference);
queue.addHead(messageReference, false);
}
else {
queue.addTail(messageReference);