Fix for https://issues.apache.org/jira/browse/AMQ-5689 Queue dispatching hangs when there are redelivered messages that dont match current consumers selectors, refactored out the pendingDispatchList in Queue implementation

This commit is contained in:
Christian Posta 2015-03-26 15:48:34 -07:00
parent f56ea45e58
commit efc9a8d578
4 changed files with 252 additions and 50 deletions

View File

@ -49,12 +49,7 @@ import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.OrderedPendingList;
import org.apache.activemq.broker.region.cursors.PendingList;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.*;
import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
@ -109,8 +104,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
private final PendingList pagedInMessages = new OrderedPendingList();
// Messages that are paged in but have not yet been targeted at a subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
protected PendingList pagedInPendingDispatch = new OrderedPendingList();
protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList();
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
@ -343,14 +337,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
@Override
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
pagedInPendingDispatch = new PrioritizedPendingList();
redeliveredWaitingDispatch = new PrioritizedPendingList();
} else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
pagedInPendingDispatch = new OrderedPendingList();
redeliveredWaitingDispatch = new OrderedPendingList();
}
dispatchPendingList.setPrioritizedMessages(prioritizedMessages);
}
@Override
@ -583,7 +570,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
if (!qmr.isDropped()) {
redeliveredWaitingDispatch.addMessageLast(qmr);
dispatchPendingList.addMessageForRedelivery(qmr);
}
}
if (sub instanceof QueueBrowserSubscription) {
@ -591,7 +578,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
browserDispatches.remove(sub);
}
// AMQ-5107: don't resend if the broker is shutting down
if (!redeliveredWaitingDispatch.isEmpty() && (! this.brokerService.isStopping())) {
if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) {
doDispatch(new OrderedPendingList());
}
} finally {
@ -1118,8 +1105,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pageInMessages(!memoryUsage.isFull(110));
};
doBrowseList(browseList, max, redeliveredWaitingDispatch, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch");
doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch");
doBrowseList(browseList, max, dispatchPendingList, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch+pagedInPendingDispatch");
doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages");
// we need a store iterator to walk messages on disk, independent of the cursor which is tracking
@ -1581,7 +1567,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.readLock().lock();
try {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
pageInMoreMessages |= !dispatchPendingList.isEmpty();
} finally {
pagedInPendingDispatchLock.readLock().unlock();
}
@ -1593,7 +1579,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// then we do a dispatch.
boolean hasBrowsers = browserDispatches.size() > 0;
if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) {
if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
try {
pageInMessages(hasBrowsers);
} catch (Throwable e) {
@ -1710,7 +1696,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
removeMessage(c, null, r);
pagedInPendingDispatchLock.writeLock().lock();
try {
pagedInPendingDispatch.remove(r);
dispatchPendingList.remove(r);
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
@ -1857,13 +1843,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
PendingList newlyPaged = doPageInForDispatch(force, processExpired);
pagedInPendingDispatchLock.writeLock().lock();
try {
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(newlyPaged);
if (dispatchPendingList.isEmpty()) {
dispatchPendingList.addAll(newlyPaged);
} else {
for (MessageReference qmr : newlyPaged) {
if (!pagedInPendingDispatch.contains(qmr)) {
pagedInPendingDispatch.addMessageLast(qmr);
if (!dispatchPendingList.contains(qmr)) {
dispatchPendingList.addMessageLast(qmr);
}
}
}
@ -1880,7 +1866,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
int pagedInPendingSize = 0;
pagedInPendingDispatchLock.readLock().lock();
try {
pagedInPendingSize = pagedInPendingDispatch.size();
pagedInPendingSize = dispatchPendingList.size();
} finally {
pagedInPendingDispatchLock.readLock().unlock();
}
@ -1973,27 +1959,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.writeLock().lock();
try {
if (!redeliveredWaitingDispatch.isEmpty()) {
// Try first to dispatch redelivered messages to keep an
// proper order
redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
}
if (redeliveredWaitingDispatch.isEmpty()) {
if (!pagedInPendingDispatch.isEmpty()) {
// Next dispatch anything that had not been
// dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
}
doActualDispatch(dispatchPendingList);
// and now see if we can dispatch the new stuff.. and append to the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (redeliveredWaitingDispatch.isEmpty() && pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
if (dispatchPendingList.isEmpty()) {
dispatchPendingList.addAll(doActualDispatch(list));
} else {
for (MessageReference qmr : list) {
if (!pagedInPendingDispatch.contains(qmr)) {
pagedInPendingDispatch.addMessageLast(qmr);
if (!dispatchPendingList.contains(qmr)) {
dispatchPendingList.addMessageLast(qmr);
}
}
doWakeUp = true;
@ -2192,10 +2167,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.writeLock().lock();
try {
for (MessageReference ref : pagedInPendingDispatch) {
for (MessageReference ref : dispatchPendingList) {
if (messageId.equals(ref.getMessageId())) {
message = (QueueMessageReference)ref;
pagedInPendingDispatch.remove(ref);
dispatchPendingList.remove(ref);
break;
}
}
@ -2245,7 +2220,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
throw new JMSException("Slave broker out of sync with master - Message: "
+ messageDispatchNotification.getMessageId() + " on "
+ messageDispatchNotification.getDestination() + " does not exist among pending("
+ pagedInPendingDispatch.size() + ") for subscription: "
+ dispatchPendingList.size() + ") for subscription: "
+ messageDispatchNotification.getConsumerId());
}
return message;

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.MessageId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
/**
* An abstraction that keeps the correct order of messages that need to be dispatched
* to consumers, but also hides the fact that there might be redelivered messages that
* should be dispatched ahead of any other paged in messages.
*
* Direct usage of this class is recommended as you can control when redeliveries need
* to be added vs regular pending messages (the next set of messages that can be dispatched)
*
* Created by ceposta
* <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
*/
public class QueueDispatchPendingList implements PendingList {
private PendingList pagedInPendingDispatch = new OrderedPendingList();
private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
@Override
public boolean isEmpty() {
return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty();
}
@Override
public void clear() {
pagedInPendingDispatch.clear();
redeliveredWaitingDispatch.clear();
}
/**
* Messages added are added directly to the pagedInPendingDispatch set of messages. If
* you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
* method
* @param message
* The MessageReference that is to be added to this list.
*
* @return
*/
@Override
public PendingNode addMessageFirst(MessageReference message) {
return pagedInPendingDispatch.addMessageFirst(message);
}
/**
* Messages added are added directly to the pagedInPendingDispatch set of messages. If
* you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
* method
* @param message
* The MessageReference that is to be added to this list.
*
* @return
*/
@Override
public PendingNode addMessageLast(MessageReference message) {
return pagedInPendingDispatch.addMessageLast(message);
}
@Override
public PendingNode remove(MessageReference message) {
if (pagedInPendingDispatch.contains(message)) {
return pagedInPendingDispatch.remove(message);
}else if (redeliveredWaitingDispatch.contains(message)) {
return redeliveredWaitingDispatch.remove(message);
}
return null;
}
@Override
public int size() {
return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size();
}
@Override
public Iterator<MessageReference> iterator() {
return new Iterator<MessageReference>() {
Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator();
Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator();
Iterator<MessageReference> current = redeliveries;
@Override
public boolean hasNext() {
if (!redeliveries.hasNext() && (current == redeliveries)) {
current = pendingDispatch;
}
return current.hasNext();
}
@Override
public MessageReference next() {
return current.next();
}
@Override
public void remove() {
current.remove();
}
};
}
@Override
public boolean contains(MessageReference message) {
return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message);
}
@Override
public Collection<MessageReference> values() {
List<MessageReference> messageReferences = new ArrayList<MessageReference>();
Iterator<MessageReference> iterator = iterator();
while (iterator.hasNext()) {
messageReferences.add(iterator.next());
}
return messageReferences;
}
@Override
public void addAll(PendingList pendingList) {
pagedInPendingDispatch.addAll(pendingList);
}
@Override
public MessageReference get(MessageId messageId) {
MessageReference rc = pagedInPendingDispatch.get(messageId);
if (rc == null) {
return redeliveredWaitingDispatch.get(messageId);
}
return rc;
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
pagedInPendingDispatch = new PrioritizedPendingList();
redeliveredWaitingDispatch = new PrioritizedPendingList();
} else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
pagedInPendingDispatch = new OrderedPendingList();
redeliveredWaitingDispatch = new OrderedPendingList();
}
}
public void addMessageForRedelivery(QueueMessageReference qmr) {
redeliveredWaitingDispatch.addMessageLast(qmr);
}
public boolean hasRedeliveries(){
return !redeliveredWaitingDispatch.isEmpty();
}
}

View File

@ -17,6 +17,8 @@
package org.apache.activemq;
import javax.jms.*;
/**
*
*/
@ -25,4 +27,54 @@ public class JmsQueueSelectorTest extends JmsTopicSelectorTest {
topic = false;
super.setUp();
}
public void testRedeliveryWithSelectors() throws Exception {
consumer = createConsumer("");
// send a message that would go to this consumer, but not to the next consumer we open
TextMessage message = session.createTextMessage("1");
message.setIntProperty("id", 1);
message.setJMSType("b");
message.setStringProperty("stringProperty", "b");
message.setLongProperty("longProperty", 1);
message.setBooleanProperty("booleanProperty", true);
producer.send(message);
// don't consume any messages.. close the consumer so that messages that had
// been dispatched get marked as delivered, and queued for redelivery
consumer.close();
// send a message that will match the selector for the next consumer
message = session.createTextMessage("1");
message.setIntProperty("id", 1);
message.setJMSType("a");
message.setStringProperty("stringProperty", "a");
message.setLongProperty("longProperty", 1);
message.setBooleanProperty("booleanProperty", true);
producer.send(message);
consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true");
// now we, should only receive 1 message, not two
int remaining = 2;
javax.jms.Message recievedMsg = null;
while (true) {
recievedMsg = consumer.receive(1000);
if (recievedMsg == null) {
break;
}
String text = ((TextMessage)recievedMsg).getText();
if (!text.equals("1") && !text.equals("3")) {
fail("unexpected message: " + text);
}
remaining--;
}
assertEquals(1, remaining);
consumer.close();
consumeMessages(remaining);
}
}

View File

@ -156,11 +156,12 @@ public class JmsTopicSelectorTest extends TestSupport {
remaining--;
}
assertEquals(remaining, 0);
assertEquals(0, remaining);
consumer.close();
consumeMessages(remaining);
}
public void testPropertySelector() throws Exception {
int remaining = 5;
Message message = null;
@ -177,7 +178,7 @@ public class JmsTopicSelectorTest extends TestSupport {
}
remaining--;
}
assertEquals(remaining, 3);
assertEquals(3, remaining);
consumer.close();
consumeMessages(remaining);
@ -199,7 +200,7 @@ public class JmsTopicSelectorTest extends TestSupport {
}
remaining--;
}
assertEquals(remaining, 2);
assertEquals(2, remaining);
consumer.close();
consumeMessages(remaining);