diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 630fe73f45..99b36a566f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -191,6 +192,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us this.usageManager.getMemoryUsage().removeUsageListener(this); ArrayList topicsToDeactivate = new ArrayList(); + List savedDispateched = null; + synchronized (pendingLock) { pending.stop(); @@ -224,6 +227,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } } + if (!topicsToDeactivate.isEmpty()) { + savedDispateched = new ArrayList(dispatched); + } dispatched.clear(); } if (!keepDurableSubsActive && pending.isTransient()) { @@ -240,7 +246,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } } for(Topic topic: topicsToDeactivate) { - topic.deactivate(context, this); + topic.deactivate(context, this, savedDispateched); } prefetchExtension.set(0); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index c9189df727..e46d80c9d3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -582,7 +582,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } - @Override + @Override public void add(ConnectionContext context, Destination destination) throws Exception { synchronized(pendingLock) { super.add(context, destination); @@ -592,6 +592,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription { @Override public List remove(ConnectionContext context, Destination destination) throws Exception { + return remove(context, destination, dispatched); + } + + public List remove(ConnectionContext context, Destination destination, List dispatched) throws Exception { List rc = new ArrayList(); synchronized(pendingLock) { super.remove(context, destination); @@ -600,23 +604,35 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // Except if each commit or rollback callback action comes before remove of subscriber. rc.addAll(pending.remove(context, destination)); - // Synchronized to DispatchLock - synchronized(dispatchLock) { - ArrayList references = new ArrayList(); - for (MessageReference r : dispatched) { - if( r.getRegionDestination() == destination) { - references.add(r); - } + if (dispatched == null) { + return rc; + } + + // Synchronized to DispatchLock if necessary + if (dispatched == this.dispatched) { + synchronized(dispatchLock) { + updateDestinationStats(rc, destination, dispatched); } - rc.addAll(references); - destination.getDestinationStatistics().getDispatched().subtract(references.size()); - destination.getDestinationStatistics().getInflight().subtract(references.size()); - dispatched.removeAll(references); + } else { + updateDestinationStats(rc, destination, dispatched); } } return rc; } + private void updateDestinationStats(List rc, Destination destination, List dispatched) { + ArrayList references = new ArrayList(); + for (MessageReference r : dispatched) { + if (r.getRegionDestination() == destination) { + references.add(r); + } + } + rc.addAll(references); + destination.getDestinationStatistics().getDispatched().subtract(references.size()); + destination.getDestinationStatistics().getInflight().subtract(references.size()); + dispatched.removeAll(references); + } + protected void dispatchPending() throws IOException { synchronized(pendingLock) { try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 7e4460513d..358fc8a195 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -73,6 +73,7 @@ public class Topic extends BaseDestination implements Task { private final TaskRunner taskRunner; private final LinkedList messagesWaitingForSpace = new LinkedList(); private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { + @Override public void run() { try { Topic.this.taskRunner.wakeup(); @@ -106,6 +107,7 @@ public class Topic extends BaseDestination implements Task { } } + @Override public List getConsumers() { synchronized (consumers) { return new ArrayList(consumers); @@ -116,6 +118,7 @@ public class Topic extends BaseDestination implements Task { return true; } + @Override public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { if (!sub.getConsumerInfo().isDurable()) { @@ -182,6 +185,7 @@ public class Topic extends BaseDestination implements Task { } } + @Override public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { if (!sub.getConsumerInfo().isDurable()) { super.removeSubscription(context, sub, lastDeliveredSequenceId); @@ -228,13 +232,13 @@ public class Topic extends BaseDestination implements Task { topicStore.deleteSubscription(clientId, subscriptionName); info = null; synchronized (consumers) { - consumers.remove(subscription); + consumers.remove(subscription); } } else { synchronized (consumers) { - if (!consumers.contains(subscription)) { - consumers.add(subscription); - } + if (!consumers.contains(subscription)) { + consumers.add(subscription); + } } } } @@ -259,6 +263,7 @@ public class Topic extends BaseDestination implements Task { msgContext.setDestination(destination); if (subscription.isRecoveryRequired()) { topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { + @Override public boolean recoverMessage(Message message) throws Exception { message.setRegionDestination(Topic.this); try { @@ -272,14 +277,17 @@ public class Topic extends BaseDestination implements Task { return true; } + @Override public boolean recoverMessageReference(MessageId messageReference) throws Exception { throw new RuntimeException("Should not be called."); } + @Override public boolean hasSpace() { return true; } + @Override public boolean isDuplicate(MessageId id) { return false; } @@ -290,11 +298,11 @@ public class Topic extends BaseDestination implements Task { } } - public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception { + public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List dispatched) throws Exception { synchronized (consumers) { consumers.remove(sub); } - sub.remove(context, this); + sub.remove(context, this, dispatched); } protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { @@ -303,6 +311,7 @@ public class Topic extends BaseDestination implements Task { } } + @Override public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { final ConnectionContext context = producerExchange.getConnectionContext(); @@ -348,6 +357,7 @@ public class Topic extends BaseDestination implements Task { if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { synchronized (messagesWaitingForSpace) { messagesWaitingForSpace.add(new Runnable() { + @Override public void run() { try { @@ -377,7 +387,6 @@ public class Topic extends BaseDestination implements Task { context.getConnection().dispatchAsync(response); } } - } }); @@ -521,6 +530,7 @@ public class Topic extends BaseDestination implements Task { return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); } + @Override public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { if (topicStore != null && node.isPersistent()) { @@ -532,6 +542,7 @@ public class Topic extends BaseDestination implements Task { messageConsumed(context, node); } + @Override public void gc() { } @@ -539,6 +550,7 @@ public class Topic extends BaseDestination implements Task { return topicStore != null ? topicStore.getMessage(messageId) : null; } + @Override public void start() throws Exception { this.subscriptionRecoveryPolicy.start(); if (memoryUsage != null) { @@ -550,6 +562,7 @@ public class Topic extends BaseDestination implements Task { } } + @Override public void stop() throws Exception { if (taskRunner != null) { taskRunner.shutdown(); @@ -565,6 +578,7 @@ public class Topic extends BaseDestination implements Task { scheduler.cancel(expireMessagesTask); } + @Override public Message[] browse() { final List result = new ArrayList(); doBrowse(result, getMaxBrowsePageSize()); @@ -576,6 +590,7 @@ public class Topic extends BaseDestination implements Task { if (topicStore != null) { final List toExpire = new ArrayList(); topicStore.recover(new MessageRecoveryListener() { + @Override public boolean recoverMessage(Message message) throws Exception { if (message.isExpired()) { toExpire.add(message); @@ -584,14 +599,17 @@ public class Topic extends BaseDestination implements Task { return true; } + @Override public boolean recoverMessageReference(MessageId messageReference) throws Exception { return true; } + @Override public boolean hasSpace() { return browseList.size() < max; } + @Override public boolean isDuplicate(MessageId id) { return false; } @@ -616,6 +634,7 @@ public class Topic extends BaseDestination implements Task { } } + @Override public boolean iterate() { synchronized (messagesWaitingForSpace) { while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { @@ -661,6 +680,7 @@ public class Topic extends BaseDestination implements Task { // Implementation methods // ------------------------------------------------------------------------- + @Override public final void wakeup() { } @@ -698,12 +718,14 @@ public class Topic extends BaseDestination implements Task { } private final Runnable expireMessagesTask = new Runnable() { + @Override public void run() { List browsedMessages = new InsertionCountList(); doBrowse(browsedMessages, getMaxExpirePageSize()); } }; + @Override public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { broker.messageExpired(context, reference, subs); // AMQ-2586: Better to leave this stat at zero than to give the user @@ -760,6 +782,7 @@ public class Topic extends BaseDestination implements Task { /** * force a reread of the store - after transaction recovery completion */ + @Override public void clearPendingMessages() { dispatchLock.readLock().lock(); try {