Ensure the destination statistics are updated on durable sub deactivate. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1476433 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-04-26 22:51:50 +00:00
parent 7057e819eb
commit 3039b67cb2
3 changed files with 65 additions and 20 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -191,6 +192,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
this.usageManager.getMemoryUsage().removeUsageListener(this);
ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
List<MessageReference> savedDispateched = null;
synchronized (pendingLock) {
pending.stop();
@ -224,6 +227,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
if (!topicsToDeactivate.isEmpty()) {
savedDispateched = new ArrayList<MessageReference>(dispatched);
}
dispatched.clear();
}
if (!keepDurableSubsActive && pending.isTransient()) {
@ -240,7 +246,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
for(Topic topic: topicsToDeactivate) {
topic.deactivate(context, this);
topic.deactivate(context, this, savedDispateched);
}
prefetchExtension.set(0);
}

View File

@ -592,6 +592,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
@Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
return remove(context, destination, dispatched);
}
public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
@ -600,11 +604,26 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Except if each commit or rollback callback action comes before remove of subscriber.
rc.addAll(pending.remove(context, destination));
// Synchronized to DispatchLock
if (dispatched == null) {
return rc;
}
// Synchronized to DispatchLock if necessary
if (dispatched == this.dispatched) {
synchronized(dispatchLock) {
updateDestinationStats(rc, destination, dispatched);
}
} else {
updateDestinationStats(rc, destination, dispatched);
}
}
return rc;
}
private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
ArrayList<MessageReference> references = new ArrayList<MessageReference>();
for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination) {
if (r.getRegionDestination() == destination) {
references.add(r);
}
}
@ -613,9 +632,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getInflight().subtract(references.size());
dispatched.removeAll(references);
}
}
return rc;
}
protected void dispatchPending() throws IOException {
synchronized(pendingLock) {

View File

@ -73,6 +73,7 @@ public class Topic extends BaseDestination implements Task {
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@Override
public void run() {
try {
Topic.this.taskRunner.wakeup();
@ -106,6 +107,7 @@ public class Topic extends BaseDestination implements Task {
}
}
@Override
public List<Subscription> getConsumers() {
synchronized (consumers) {
return new ArrayList<Subscription>(consumers);
@ -116,6 +118,7 @@ public class Topic extends BaseDestination implements Task {
return true;
}
@Override
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
@ -182,6 +185,7 @@ public class Topic extends BaseDestination implements Task {
}
}
@Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
@ -259,6 +263,7 @@ public class Topic extends BaseDestination implements Task {
msgContext.setDestination(destination);
if (subscription.isRecoveryRequired()) {
topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
@Override
public boolean recoverMessage(Message message) throws Exception {
message.setRegionDestination(Topic.this);
try {
@ -272,14 +277,17 @@ public class Topic extends BaseDestination implements Task {
return true;
}
@Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
throw new RuntimeException("Should not be called.");
}
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean isDuplicate(MessageId id) {
return false;
}
@ -290,11 +298,11 @@ public class Topic extends BaseDestination implements Task {
}
}
public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
synchronized (consumers) {
consumers.remove(sub);
}
sub.remove(context, this);
sub.remove(context, this, dispatched);
}
protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
@ -303,6 +311,7 @@ public class Topic extends BaseDestination implements Task {
}
}
@Override
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
@ -348,6 +357,7 @@ public class Topic extends BaseDestination implements Task {
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
@Override
public void run() {
try {
@ -377,7 +387,6 @@ public class Topic extends BaseDestination implements Task {
context.getConnection().dispatchAsync(response);
}
}
}
});
@ -521,6 +530,7 @@ public class Topic extends BaseDestination implements Task {
return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
}
@Override
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
@ -532,6 +542,7 @@ public class Topic extends BaseDestination implements Task {
messageConsumed(context, node);
}
@Override
public void gc() {
}
@ -539,6 +550,7 @@ public class Topic extends BaseDestination implements Task {
return topicStore != null ? topicStore.getMessage(messageId) : null;
}
@Override
public void start() throws Exception {
this.subscriptionRecoveryPolicy.start();
if (memoryUsage != null) {
@ -550,6 +562,7 @@ public class Topic extends BaseDestination implements Task {
}
}
@Override
public void stop() throws Exception {
if (taskRunner != null) {
taskRunner.shutdown();
@ -565,6 +578,7 @@ public class Topic extends BaseDestination implements Task {
scheduler.cancel(expireMessagesTask);
}
@Override
public Message[] browse() {
final List<Message> result = new ArrayList<Message>();
doBrowse(result, getMaxBrowsePageSize());
@ -576,6 +590,7 @@ public class Topic extends BaseDestination implements Task {
if (topicStore != null) {
final List<Message> toExpire = new ArrayList<Message>();
topicStore.recover(new MessageRecoveryListener() {
@Override
public boolean recoverMessage(Message message) throws Exception {
if (message.isExpired()) {
toExpire.add(message);
@ -584,14 +599,17 @@ public class Topic extends BaseDestination implements Task {
return true;
}
@Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
return true;
}
@Override
public boolean hasSpace() {
return browseList.size() < max;
}
@Override
public boolean isDuplicate(MessageId id) {
return false;
}
@ -616,6 +634,7 @@ public class Topic extends BaseDestination implements Task {
}
}
@Override
public boolean iterate() {
synchronized (messagesWaitingForSpace) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
@ -661,6 +680,7 @@ public class Topic extends BaseDestination implements Task {
// Implementation methods
// -------------------------------------------------------------------------
@Override
public final void wakeup() {
}
@ -698,12 +718,14 @@ public class Topic extends BaseDestination implements Task {
}
private final Runnable expireMessagesTask = new Runnable() {
@Override
public void run() {
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
}
};
@Override
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference, subs);
// AMQ-2586: Better to leave this stat at zero than to give the user
@ -760,6 +782,7 @@ public class Topic extends BaseDestination implements Task {
/**
* force a reread of the store - after transaction recovery completion
*/
@Override
public void clearPendingMessages() {
dispatchLock.readLock().lock();
try {