diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c3de541c88..3438dccd16 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -49,12 +49,7 @@ import javax.jms.ResourceAllocationException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.cursors.OrderedPendingList; -import org.apache.activemq.broker.region.cursors.PendingList; -import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.broker.region.cursors.PrioritizedPendingList; -import org.apache.activemq.broker.region.cursors.StoreQueueCursor; -import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; +import org.apache.activemq.broker.region.cursors.*; import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; @@ -109,8 +104,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index private final PendingList pagedInMessages = new OrderedPendingList(); // Messages that are paged in but have not yet been targeted at a subscription private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); - protected PendingList pagedInPendingDispatch = new OrderedPendingList(); - protected PendingList redeliveredWaitingDispatch = new OrderedPendingList(); + protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList(); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); @@ -343,14 +337,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index @Override public void setPrioritizedMessages(boolean prioritizedMessages) { super.setPrioritizedMessages(prioritizedMessages); - - if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { - pagedInPendingDispatch = new PrioritizedPendingList(); - redeliveredWaitingDispatch = new PrioritizedPendingList(); - } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { - pagedInPendingDispatch = new OrderedPendingList(); - redeliveredWaitingDispatch = new OrderedPendingList(); - } + dispatchPendingList.setPrioritizedMessages(prioritizedMessages); } @Override @@ -583,7 +570,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } if (!qmr.isDropped()) { - redeliveredWaitingDispatch.addMessageLast(qmr); + dispatchPendingList.addMessageForRedelivery(qmr); } } if (sub instanceof QueueBrowserSubscription) { @@ -591,7 +578,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index browserDispatches.remove(sub); } // AMQ-5107: don't resend if the broker is shutting down - if (!redeliveredWaitingDispatch.isEmpty() && (! this.brokerService.isStopping())) { + if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) { doDispatch(new OrderedPendingList()); } } finally { @@ -1118,8 +1105,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pageInMessages(!memoryUsage.isFull(110)); }; - doBrowseList(browseList, max, redeliveredWaitingDispatch, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch"); - doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch"); + doBrowseList(browseList, max, dispatchPendingList, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch+pagedInPendingDispatch"); doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages"); // we need a store iterator to walk messages on disk, independent of the cursor which is tracking @@ -1581,7 +1567,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.readLock().lock(); try { - pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); + pageInMoreMessages |= !dispatchPendingList.isEmpty(); } finally { pagedInPendingDispatchLock.readLock().unlock(); } @@ -1593,7 +1579,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // then we do a dispatch. boolean hasBrowsers = browserDispatches.size() > 0; - if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) { + if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { try { pageInMessages(hasBrowsers); } catch (Throwable e) { @@ -1710,7 +1696,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index removeMessage(c, null, r); pagedInPendingDispatchLock.writeLock().lock(); try { - pagedInPendingDispatch.remove(r); + dispatchPendingList.remove(r); } finally { pagedInPendingDispatchLock.writeLock().unlock(); } @@ -1857,13 +1843,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index PendingList newlyPaged = doPageInForDispatch(force, processExpired); pagedInPendingDispatchLock.writeLock().lock(); try { - if (pagedInPendingDispatch.isEmpty()) { - pagedInPendingDispatch.addAll(newlyPaged); + if (dispatchPendingList.isEmpty()) { + dispatchPendingList.addAll(newlyPaged); } else { for (MessageReference qmr : newlyPaged) { - if (!pagedInPendingDispatch.contains(qmr)) { - pagedInPendingDispatch.addMessageLast(qmr); + if (!dispatchPendingList.contains(qmr)) { + dispatchPendingList.addMessageLast(qmr); } } } @@ -1880,7 +1866,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index int pagedInPendingSize = 0; pagedInPendingDispatchLock.readLock().lock(); try { - pagedInPendingSize = pagedInPendingDispatch.size(); + pagedInPendingSize = dispatchPendingList.size(); } finally { pagedInPendingDispatchLock.readLock().unlock(); } @@ -1973,27 +1959,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.writeLock().lock(); try { - if (!redeliveredWaitingDispatch.isEmpty()) { - // Try first to dispatch redelivered messages to keep an - // proper order - redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch); - } - if (redeliveredWaitingDispatch.isEmpty()) { - if (!pagedInPendingDispatch.isEmpty()) { - // Next dispatch anything that had not been - // dispatched before. - pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); - } - } + doActualDispatch(dispatchPendingList); // and now see if we can dispatch the new stuff.. and append to the pending // list anything that does not actually get dispatched. if (list != null && !list.isEmpty()) { - if (redeliveredWaitingDispatch.isEmpty() && pagedInPendingDispatch.isEmpty()) { - pagedInPendingDispatch.addAll(doActualDispatch(list)); + if (dispatchPendingList.isEmpty()) { + dispatchPendingList.addAll(doActualDispatch(list)); } else { for (MessageReference qmr : list) { - if (!pagedInPendingDispatch.contains(qmr)) { - pagedInPendingDispatch.addMessageLast(qmr); + if (!dispatchPendingList.contains(qmr)) { + dispatchPendingList.addMessageLast(qmr); } } doWakeUp = true; @@ -2192,10 +2167,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.writeLock().lock(); try { - for (MessageReference ref : pagedInPendingDispatch) { + for (MessageReference ref : dispatchPendingList) { if (messageId.equals(ref.getMessageId())) { message = (QueueMessageReference)ref; - pagedInPendingDispatch.remove(ref); + dispatchPendingList.remove(ref); break; } } @@ -2245,7 +2220,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination() + " does not exist among pending(" - + pagedInPendingDispatch.size() + ") for subscription: " + + dispatchPendingList.size() + ") for subscription: " + messageDispatchNotification.getConsumerId()); } return message; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java new file mode 100644 index 0000000000..8c6032b2e1 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; +import org.apache.activemq.command.MessageId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * An abstraction that keeps the correct order of messages that need to be dispatched + * to consumers, but also hides the fact that there might be redelivered messages that + * should be dispatched ahead of any other paged in messages. + * + * Direct usage of this class is recommended as you can control when redeliveries need + * to be added vs regular pending messages (the next set of messages that can be dispatched) + * + * Created by ceposta + *