diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index f82a6e775f..a9a9b6cdb6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -17,6 +17,9 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import javax.jms.InvalidSelectorException; @@ -104,8 +107,9 @@ public abstract class AbstractSubscription implements Subscription { destinations.add(destination); } - public void remove(ConnectionContext context, Destination destination) throws Exception { + public List remove(ConnectionContext context, Destination destination) throws Exception { destinations.remove(destination); + return Collections.EMPTY_LIST; } public ConsumerInfo getConsumerInfo() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 2bba05edca..06bbe0f623 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -436,11 +436,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } - public void remove(ConnectionContext context, Destination destination) throws Exception { + public List remove(ConnectionContext context, Destination destination) throws Exception { + List rc = new ArrayList(); synchronized(pendingLock) { super.remove(context, destination); - pending.remove(context, destination); + for (MessageReference r : dispatched) { + if( r.getRegionDestination() == destination ) { + rc.add((QueueMessageReference)r); + } + } + rc.addAll(pending.remove(context, destination)); } + return rc; } protected void dispatchPending() throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 921fb40e5e..a3c5aadc7a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -292,25 +292,20 @@ public class Queue extends BaseDestination implements Task { ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); MessageGroupSet ownedGroups = getMessageGroupOwners() .removeConsumer(consumerId); + // redeliver inflight messages - sub.remove(context, this); - List list = new ArrayList(); - List inFlight = null; - synchronized(pagedInMessages) { - inFlight = new ArrayList(pagedInMessages.values()); + for (MessageReference ref : sub.remove(context, this)) { + QueueMessageReference qmr = (QueueMessageReference)ref; + qmr.incrementRedeliveryCounter(); + if( qmr.getLockOwner()==sub ) { + qmr.unlock(); + qmr.incrementRedeliveryCounter(); + } + list.add(qmr); } - for (QueueMessageReference node:inFlight){ - if (!node.isDropped() && !node.isAcked() - && node.getLockOwner() == sub) { - if (node.unlock()) { - node.incrementRedeliveryCounter(); - list.add(node); - } - } - } - if (list != null && !consumers.isEmpty()) { + if (!list.isEmpty() && !consumers.isEmpty()) { doDispatch(list); } } @@ -938,6 +933,7 @@ public class Queue extends BaseDestination implements Task { if( rd.subscription instanceof QueueBrowserSubscription ) { ((QueueBrowserSubscription)rd.subscription).decrementQueueRef(); } + } catch (Exception e) { e.printStackTrace(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index 0642be9d9d..3ec5955e95 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -87,8 +87,9 @@ public interface Subscription extends SubscriptionRecovery { * The subscription will be no longer be receiving messages from the destination. * @param context * @param destination + * @return a list of un-acked messages that were added to the subscription. */ - void remove(ConnectionContext context, Destination destination) throws Exception; + List remove(ConnectionContext context, Destination destination) throws Exception; /** * The ConsumerInfo object that created the subscription. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 62c92eb179..1b277242c7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -16,12 +16,15 @@ */ package org.apache.activemq.broker.region.cursors; +import java.util.Collections; import java.util.LinkedList; +import java.util.List; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; @@ -59,7 +62,9 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { public void add(ConnectionContext context, Destination destination) throws Exception { } - public void remove(ConnectionContext context, Destination destination) throws Exception { + @SuppressWarnings("unchecked") + public List remove(ConnectionContext context, Destination destination) throws Exception { + return Collections.EMPTY_LIST; } public boolean isRecoveryRequired() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 5ffbbb507b..00b6bca578 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -110,6 +110,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple } return isDiskListEmpty(); } + + /** * reset the cursor diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 9b39e38a4b..44f3e7fd48 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -18,12 +18,14 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; import java.util.LinkedList; +import java.util.List; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; @@ -51,7 +53,7 @@ public interface PendingMessageCursor extends Service { * @param destination * @throws Exception */ - void remove(ConnectionContext context, Destination destination) throws Exception; + List remove(ConnectionContext context, Destination destination) throws Exception; /** * @return true if there are no pending messages diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 7e7b7238dc..205abe4187 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -17,9 +17,11 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.activemq.advisory.AdvisorySupport; @@ -27,6 +29,7 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; @@ -128,11 +131,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @param destination * @throws Exception */ - public synchronized void remove(ConnectionContext context, Destination destination) throws Exception { + public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { Object tsp = topics.remove(destination); if (tsp != null) { storePrefetches.remove(tsp); } + return Collections.EMPTY_LIST; } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java index 958ce8898c..e4cc01f1a0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java @@ -16,8 +16,13 @@ */ package org.apache.activemq.broker.region.cursors; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.QueueMessageReference; @@ -32,6 +37,18 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { private Iterator iter; private MessageReference last; + + @Override + public List remove(ConnectionContext context, Destination destination) throws Exception { + List rc = new ArrayList(); + for (MessageReference r : list) { + if( r.getRegionDestination()==destination ) { + rc.add(r); + } + } + return rc ; + } + /** * @return true if there are no pending messages */