From cf3d419528e5596c1226b5ca11ed37e8039bc006 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Fri, 18 Mar 2016 16:31:18 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6375 Adding non-blocking metrics to KahaDB to track message counts and sizes for subscriptions. Turned off by default but can be enabled on the KahaDBPersistenceAdapter --- .../region/cursors/TopicStorePrefetch.java | 1 - .../store/AbstractMessageStoreStatistics.java | 81 ++++++++ .../MessageStoreSubscriptionStatistics.java | 112 ++++++++++ .../store/ProxyTopicMessageStore.java | 4 + .../activemq/store/TopicMessageStore.java | 17 ++ .../store/memory/MemoryTopicMessageStore.java | 9 + .../store/jdbc/JDBCTopicMessageStore.java | 8 + .../journal/JournalTopicMessageStore.java | 7 + .../kahadb/KahaDBPersistenceAdapter.java | 20 ++ .../activemq/store/kahadb/KahaDBStore.java | 121 ++++++++--- .../store/kahadb/MessageDatabase.java | 106 +++++++++- .../store/kahadb/TempKahaDBStore.java | 8 + .../AbstractPendingMessageCursorTest.java | 53 +++-- .../KahaDBPendingMessageCursorTest.java | 196 +++++++++++++++++- .../MemoryPendingMessageCursorTest.java | 24 ++- .../MultiKahaDBPendingMessageCursorTest.java | 6 +- 16 files changed, 701 insertions(+), 72 deletions(-) create mode 100644 activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStoreStatistics.java create mode 100644 activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 35ec3edbb4..bfc745ed9b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -70,7 +70,6 @@ class TopicStorePrefetch extends AbstractStoreCursor { batchList.addMessageFirst(node); size++; node.incrementReferenceCount(); - //this.messageSize.addSize(node.getMessage().getSize()); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStoreStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStoreStatistics.java new file mode 100644 index 0000000000..249b767285 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStoreStatistics.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.SizeStatisticImpl; +import org.apache.activemq.management.StatsImpl; + + +public abstract class AbstractMessageStoreStatistics extends StatsImpl { + + protected final CountStatisticImpl messageCount; + protected final SizeStatisticImpl messageSize; + + + protected AbstractMessageStoreStatistics(String countDescription, String sizeDescription) { + this(true, countDescription, sizeDescription); + } + + protected AbstractMessageStoreStatistics(boolean enabled, String countDescription, String sizeDescription) { + + messageCount = new CountStatisticImpl("messageCount", countDescription); + messageSize = new SizeStatisticImpl("messageSize", sizeDescription); + + addStatistic("messageCount", messageCount); + addStatistic("messageSize", messageSize); + + this.setEnabled(enabled); + } + + + public CountStatisticImpl getMessageCount() { + return messageCount; + } + + public SizeStatisticImpl getMessageSize() { + return messageSize; + } + + @Override + public void reset() { + if (this.isDoReset()) { + super.reset(); + messageCount.reset(); + messageSize.reset(); + } + } + + @Override + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + messageCount.setEnabled(enabled); + messageSize.setEnabled(enabled); + } + + public void setParent(AbstractMessageStoreStatistics parent) { + if (parent != null) { + messageCount.setParent(parent.messageCount); + messageSize.setParent(parent.messageSize); + } else { + messageCount.setParent(null); + messageSize.setParent(null); + } + } + +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java new file mode 100644 index 0000000000..d246957308 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.SizeStatisticImpl; + +public class MessageStoreSubscriptionStatistics extends AbstractMessageStoreStatistics { + + private final ConcurrentMap subStatistics + = new ConcurrentHashMap<>(); + + /** + * @param enabled + * @param countDescription + * @param sizeDescription + */ + public MessageStoreSubscriptionStatistics(boolean enabled) { + super(enabled, "The number of messages or this subscription in the message store", + "Size of messages contained by this subscription in the message store"); + } + + /** + * Total count for all subscriptions + */ + @Override + public CountStatisticImpl getMessageCount() { + return this.messageCount; + } + + /** + * Total size for all subscriptions + */ + @Override + public SizeStatisticImpl getMessageSize() { + return this.messageSize; + } + + public CountStatisticImpl getMessageCount(String subKey) { + return getOrInitStatistics(subKey).getMessageCount(); + } + + public SizeStatisticImpl getMessageSize(String subKey) { + return getOrInitStatistics(subKey).getMessageSize(); + } + + public void removeSubscription(String subKey) { + SubscriptionStatistics subStats = subStatistics.remove(subKey); + //Subtract from the parent + if (subStats != null) { + getMessageCount().subtract(subStats.getMessageCount().getCount()); + getMessageSize().addSize(-subStats.getMessageSize().getTotalSize()); + } + } + + @Override + public void reset() { + super.reset(); + subStatistics.clear(); + } + + private SubscriptionStatistics getOrInitStatistics(String subKey) { + SubscriptionStatistics subStats = subStatistics.get(subKey); + + if (subStats == null) { + final SubscriptionStatistics stats = new SubscriptionStatistics(); + subStats = subStatistics.putIfAbsent(subKey, stats); + if (subStats == null) { + subStats = stats; + } + } + + return subStats; + } + + private class SubscriptionStatistics extends AbstractMessageStoreStatistics { + + public SubscriptionStatistics() { + this(MessageStoreSubscriptionStatistics.this.enabled); + } + + /** + * @param enabled + * @param countDescription + * @param sizeDescription + */ + public SubscriptionStatistics(boolean enabled) { + super(enabled, "The number of messages or this subscription in the message store", + "Size of messages contained by this subscription in the message store"); + this.setParent(MessageStoreSubscriptionStatistics.this); + } + } + +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 6e79358fd8..b9b79c9c3a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -233,4 +233,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore { return delegate.getMessageSize(clientId, subscriberName); } + @Override + public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { + return delegate.getMessageStoreSubStatistics(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java index a55118faa9..1d126c2bc3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java @@ -102,8 +102,25 @@ public interface TopicMessageStore extends MessageStore { */ int getMessageCount(String clientId, String subscriberName) throws IOException; + /** + * Get the total size of the messages ready to deliver from the store to the + * durable subscriber + * + * @param clientId + * @param subscriberName + * @return + * @throws IOException + */ long getMessageSize(String clientId, String subscriberName) throws IOException; + /** + * The subscription metrics contained in this store + * + * @param subscriptionKey + * @return + */ + MessageStoreSubscriptionStatistics getMessageStoreSubStatistics(); + /** * Finds the subscriber entry for the given consumer info * diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index ae693f1ac8..308ca59b4b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -31,6 +31,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStoreStatistics; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.SubscriptionKey; @@ -171,6 +172,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic } } + //Disabled for the memory store, can be enabled later if necessary + private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); + + @Override + public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { + return stats; + } + /** * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions * when computing the message store statistics. diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 3bff9b254a..3c8ba54d5d 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -36,6 +36,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; @@ -434,4 +435,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess return result; } + private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); + + @Override + public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { + return stats; + } + } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java index aa0cb5d2b8..c083482307 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java @@ -29,6 +29,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.Callback; @@ -230,4 +231,10 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top longTermStore.resetBatching(clientId, subscriptionName); } + private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); + + @Override + public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { + return stats; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 21cac0ab85..e5d268c571 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -687,6 +687,26 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements letter.setEnableAckCompaction(enableAckCompaction); } + /** + * Whether non-blocking subscription statistics have been enabled + * + * @return + */ + public boolean isEnableSubscriptionStatistics() { + return letter.isEnableSubscriptionStatistics(); + } + + /** + * Enable caching statistics for each subscription to allow non-blocking + * retrieval of metrics. This could incur some overhead to compute if there are a lot + * of subscriptions. + * + * @param enableSubscriptionStatistics + */ + public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) { + letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics); + } + public KahaDBStore getStore() { return letter; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 7f8283d61d..9ed738159f 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -62,6 +62,7 @@ import org.apache.activemq.store.ListenableFuture; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStoreStatistics; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; @@ -742,6 +743,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { private final AtomicInteger subscriptionCount = new AtomicInteger(); + protected final MessageStoreSubscriptionStatistics messageStoreSubStats = + new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics()); + public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { super(destination); this.subscriptionCount.set(getAllSubscriptions().length); @@ -750,6 +754,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } + @Override + protected void recoverMessageStoreStatistics() throws IOException { + super.recoverMessageStoreStatistics(); + this.recoverMessageStoreSubMetrics(); + } + @Override public ListenableFuture asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { @@ -884,23 +894,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public int getMessageCount(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.writeLock().lock(); - try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Integer execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos == null) { - // The subscription might not exist. - return 0; - } - return (int) getStoredMessageCount(tx, sd, subscriptionKey); - } - }); - } finally { - indexLock.writeLock().unlock(); + if (isEnableSubscriptionStatistics()) { + return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); + } else { + + indexLock.writeLock().lock(); + try { + return pageFile.tx().execute(new Transaction.CallableClosure() { + @Override + public Integer execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos == null) { + // The subscription might not exist. + return 0; + } + + return (int) getStoredMessageCount(tx, sd, subscriptionKey); + } + }); + } finally { + indexLock.writeLock().unlock(); + } } } @@ -908,23 +924,59 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public long getMessageSize(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.writeLock().lock(); - try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Integer execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos == null) { - // The subscription might not exist. - return 0; - } + if (isEnableSubscriptionStatistics()) { + return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); + } else { + indexLock.writeLock().lock(); + try { + return pageFile.tx().execute(new Transaction.CallableClosure() { + @Override + public Integer execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos == null) { + // The subscription might not exist. + return 0; + } - return (int) getStoredMessageSize(tx, sd, subscriptionKey); - } - }); - } finally { - indexLock.writeLock().unlock(); + return (int) getStoredMessageSize(tx, sd, subscriptionKey); + } + }); + } finally { + indexLock.writeLock().unlock(); + } + } + } + + + protected void recoverMessageStoreSubMetrics() throws IOException { + if (isEnableSubscriptionStatistics()) { + + final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); + indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + for (Iterator> iterator = sd.subscriptions + .iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + + String subscriptionKey = entry.getKey(); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos != null) { + long size = getStoredMessageSize(tx, sd, subscriptionKey); + statistics.getMessageCount(subscriptionKey) + .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); + statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0); + } + } + } + }); + } finally { + indexLock.writeLock().unlock(); + } } } @@ -1032,6 +1084,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { throw new RuntimeException(e); } } + + @Override + public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { + return messageStoreSubStats; + } } String subscriptionKey(String clientId, String subscriptionName) { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 365de7d58b..ef806e3c45 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -70,6 +70,8 @@ import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStoreStatistics; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; +import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; @@ -282,6 +284,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean compactAcksIgnoresStoreGrowth = false; private int checkPointCyclesWithNoGC; private int journalLogOnLastCompactionCheck; + private boolean enableSubscriptionStatistics = false; @Override public void doStart() throws Exception { @@ -1420,7 +1423,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { - addAckLocationForNewMessage(tx, sd, id); + addAckLocationForNewMessage(tx, command.getDestination(), sd, id); } metadata.lastUpdate = location; } else { @@ -1482,6 +1485,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe //Remove the existing from the size decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); + //update all the subscription metrics + if (enableSubscriptionStatistics && location.getSize() != previousKeys.location.getSize()) { + Iterator> iter = sd.ackPositions.iterator(tx); + while (iter.hasNext()) { + Entry e = iter.next(); + if (e.getValue().contains(id)) { + incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize()); + decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize()); + } + } + } + // on first update previous is original location, on recovery/replay it may be the updated location if(!previousKeys.location.equals(location)) { sd.locationIndex.remove(tx, previousKeys.location); @@ -1622,6 +1637,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionCache.remove(subscriptionKey); removeAckLocationsForSub(command, tx, sd, subscriptionKey); + MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination())); + if (subStats != null) { + subStats.removeSubscription(subscriptionKey); + } if (sd.subscriptions.isEmpty(tx)) { // remove the stored destination @@ -2565,10 +2584,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @param kahaDestination */ protected void clearStoreStats(KahaDestination kahaDestination) { - MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination)); + String key = key(kahaDestination); + MessageStoreStatistics storeStats = getStoreStats(key); + MessageStoreSubscriptionStatistics subStats = getSubStats(key); if (storeStats != null) { storeStats.reset(); } + if (subStats != null) { + subStats.reset(); + } } /** @@ -2605,8 +2629,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) { + incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size); + } + + protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) { + if (enableSubscriptionStatistics) { + MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey); + if (subStats != null && subKey != null) { + subStats.getMessageCount(subKey).increment(); + if (size > 0) { + subStats.getMessageSize(subKey).addSize(size); + } + } + } + } + + + protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) { + if (enableSubscriptionStatistics) { + MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey); + if (subStats != null && subKey != null) { + subStats.getMessageCount(subKey).decrement(); + if (size > 0) { + subStats.getMessageSize(subKey).addSize(-size); + } + } + } + } + + protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) { + decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size); + } + /** - * This is a map to cache DestinationStatistics for a specific + * This is a map to cache MessageStores for a specific * KahaDestination key */ protected final ConcurrentMap storeCache = @@ -2631,6 +2688,20 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return storeStats; } + protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) { + MessageStoreSubscriptionStatistics subStats = null; + try { + MessageStore messageStore = storeCache.get(kahaDestKey); + if (messageStore instanceof TopicMessageStore) { + subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics(); + } + } catch (Exception e1) { + LOG.error("Getting size counter of destination failed", e1); + } + + return subStats; + } + /** * Determine whether this Destination matches the DestinationType * @@ -2736,7 +2807,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // on a new message add, all existing subs are interested in this message - private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { + private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest, + StoredDestination sd, Long messageSequence) throws IOException { for(String subscriptionKey : sd.subscriptionCache) { SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); if (sequences == null) { @@ -2748,6 +2820,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.ackPositions.put(tx, subscriptionKey, sequences); } + MessageKeys key = sd.orderIndex.get(tx, messageSequence); + incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, + key.location.getSize()); + Long count = sd.messageReferences.get(messageSequence); if (count == null) { count = Long.valueOf(0L); @@ -2819,6 +2895,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.ackPositions.remove(tx, subscriptionKey); } + MessageKeys key = sd.orderIndex.get(tx, messageSequence); + decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, + key.location.getSize()); + // Check if the message is reference by any other subscription. Long count = sd.messageReferences.get(messageSequence); if (count != null) { @@ -3786,4 +3866,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void setEnableAckCompaction(boolean enableAckCompaction) { this.enableAckCompaction = enableAckCompaction; } + + /** + * @return + */ + public boolean isEnableSubscriptionStatistics() { + return enableSubscriptionStatistics; + } + + /** + * Enable caching statistics for each subscription to allow non-blocking + * retrieval of metrics. This could incur some overhead to compute if there are a lot + * of subscriptions. + * + * @param enableSubscriptionStatistics + */ + public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) { + this.enableSubscriptionStatistics = enableSubscriptionStatistics; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 920fc53303..96869132f1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -46,6 +46,7 @@ import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionRecoveryListener; @@ -413,6 +414,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return 0; } + private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); + + @Override + public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { + return stats; + } + @Override public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java index ba17dacab4..645fb2ddd2 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java @@ -21,8 +21,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; -import java.util.Arrays; -import java.util.Collection; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; @@ -49,9 +48,11 @@ import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; import org.junit.After; +import org.junit.Assume; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.junit.runners.Parameterized.Parameters; +import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,22 +72,16 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat protected String defaultQueueName = "test.queue"; protected String defaultTopicName = "test.topic"; protected static int maxMessageSize = 1000; - protected boolean prioritizedMessages; + protected final boolean prioritizedMessages; + protected boolean enableSubscriptionStatistics; - @Parameters(name="prioritizedMessages={0}") - public static Collection data() { - return Arrays.asList(new Object[][] { - // use priority messages - {true}, - // don't use priority messages - {false} - }); - } + @Rule + public Timeout globalTimeout= new Timeout(60, TimeUnit.SECONDS); /** * @param prioritizedMessages */ - public AbstractPendingMessageCursorTest(boolean prioritizedMessages) { + public AbstractPendingMessageCursorTest(final boolean prioritizedMessages) { super(); this.prioritizedMessages = prioritizedMessages; } @@ -144,6 +139,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat @Test public void testQueueMessageSize() throws Exception { + //doesn't apply to queues, only run once + Assume.assumeFalse(enableSubscriptionStatistics); + AtomicLong publishedMessageSize = new AtomicLong(); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); @@ -153,6 +151,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat @Test public void testQueueBrowserMessageSize() throws Exception { + //doesn't apply to queues, only run once + Assume.assumeFalse(enableSubscriptionStatistics); + AtomicLong publishedMessageSize = new AtomicLong(); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); @@ -163,6 +164,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat @Test public void testQueueMessageSizeNonPersistent() throws Exception { + //doesn't apply to queues, only run once + Assume.assumeFalse(enableSubscriptionStatistics); + AtomicLong publishedMessageSize = new AtomicLong(); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, @@ -172,6 +176,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat @Test public void testQueueMessageSizePersistentAndNonPersistent() throws Exception { + //doesn't apply to queues, only run once + Assume.assumeFalse(enableSubscriptionStatistics); + AtomicLong publishedNonPersistentMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong(); @@ -185,6 +192,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat @Test public void testQueueMessageSizeAfterConsumption() throws Exception { + //doesn't apply to queues, only run once + Assume.assumeFalse(enableSubscriptionStatistics); + AtomicLong publishedMessageSize = new AtomicLong(); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); @@ -198,6 +208,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat @Test public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception { + //doesn't apply to queues, only run once + Assume.assumeFalse(enableSubscriptionStatistics); + AtomicLong publishedMessageSize = new AtomicLong(); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize); @@ -209,7 +222,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat verifyStoreStats(dest, 0, 0); } - @Test(timeout=60000) + @Test public void testTopicMessageSize() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -235,7 +248,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat connection.close(); } - @Test(timeout=60000) + @Test public void testTopicNonPersistentMessageSize() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -262,7 +275,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat connection.close(); } - @Test(timeout=60000) + @Test public void testTopicPersistentAndNonPersistentMessageSize() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -291,7 +304,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat connection.close(); } - @Test(timeout=60000) + @Test public void testMessageSizeOneDurable() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); @@ -320,7 +333,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat connection.close(); } - @Test(timeout=60000) + @Test public void testMessageSizeOneDurablePartialConsumption() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -346,7 +359,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat connection.close(); } - @Test(timeout=60000) + @Test public void testMessageSizeTwoDurables() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java index 5cefc1b778..8fd42e275e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java @@ -17,47 +17,78 @@ package org.apache.activemq.broker.region.cursors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.MessageStoreSubscriptionStatistics; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.util.SubscriptionKey; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; import org.apache.commons.io.FileUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This test checks that pending message metrics work properly with KahaDB * - * AMQ-5923 + * AMQ-5923, AMQ-6375 * */ @RunWith(Parameterized.class) public class KahaDBPendingMessageCursorTest extends AbstractPendingMessageCursorTest { + protected static final Logger LOG = LoggerFactory .getLogger(KahaDBPendingMessageCursorTest.class); + @Parameters(name = "prioritizedMessages={0},enableSubscriptionStatistics={1}") + public static Collection data() { + return Arrays.asList(new Object[][] { + // use priority messages + { true, true }, + { true, false }, + // don't use priority messages + { false, true }, + { false, false } + }); + } + @Rule public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); /** * @param prioritizedMessages */ - public KahaDBPendingMessageCursorTest(boolean prioritizedMessages) { + public KahaDBPendingMessageCursorTest(final boolean prioritizedMessages, + final boolean enableSubscriptionStatistics) { super(prioritizedMessages); + this.enableSubscriptionStatistics = enableSubscriptionStatistics; } @Override @@ -71,7 +102,10 @@ public class KahaDBPendingMessageCursorTest extends protected void initPersistence(BrokerService brokerService) throws IOException { broker.setPersistent(true); - broker.setDataDirectoryFile(dataFileDir.getRoot()); + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(dataFileDir.getRoot()); + persistenceAdapter.setEnableSubscriptionStatistics(enableSubscriptionStatistics); + broker.setPersistenceAdapter(persistenceAdapter); } /** @@ -80,7 +114,7 @@ public class KahaDBPendingMessageCursorTest extends * * @throws Exception */ - @Test(timeout=60000) + @Test public void testDurableMessageSizeAfterRestartAndPublish() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -97,13 +131,18 @@ public class KahaDBPendingMessageCursorTest extends verifyStoreStats(topic, 200, publishedMessageSize.get()); //should be equal in this case - assertEquals(topic.getDurableTopicSubs().get(subKey).getPendingMessageSize(), + long beforeRestartSize = topic.getDurableTopicSubs().get(subKey).getPendingMessageSize(); + assertEquals(beforeRestartSize, topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize()); // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); + //verify that after restart the size is the same as before restart on recovery + topic = (Topic) getBroker().getDestination(new ActiveMQTopic(defaultTopicName)); + assertEquals(beforeRestartSize, topic.getDurableTopicSubs().get(subKey).getPendingMessageSize()); + connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); connection.setClientID("clientId"); connection.start(); @@ -117,7 +156,7 @@ public class KahaDBPendingMessageCursorTest extends } - @Test(timeout=60000) + @Test public void testMessageSizeTwoDurablesPartialConsumption() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -153,7 +192,7 @@ public class KahaDBPendingMessageCursorTest extends * * @throws Exception */ - @Test(timeout=60000) + @Test public void testNonPersistentDurableMessageSize() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -170,4 +209,147 @@ public class KahaDBPendingMessageCursorTest extends verifyStoreStats(topic, 0, 0); } + /** + * Test that the subscription counters are properly set when enabled + * and not set when disabled + * + * @throws Exception + */ + @Test + public void testEnabledSubscriptionStatistics() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2"); + org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable( + connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT); + + TopicMessageStore store = (TopicMessageStore) dest.getMessageStore(); + MessageStoreSubscriptionStatistics stats = store.getMessageStoreSubStatistics(); + if (enableSubscriptionStatistics) { + assertTrue(stats.getMessageCount(subKey.toString()).getCount() == 200); + assertTrue(stats.getMessageSize(subKey.toString()).getTotalSize() > 0); + assertTrue(stats.getMessageCount(subKey2.toString()).getCount() == 200); + assertTrue(stats.getMessageSize(subKey2.toString()).getTotalSize() > 0); + assertEquals(stats.getMessageCount().getCount(), + stats.getMessageCount(subKey.toString()).getCount() + + stats.getMessageSize(subKey.toString()).getCount()); + assertEquals(stats.getMessageSize().getTotalSize(), + stats.getMessageSize(subKey.toString()).getTotalSize() + + stats.getMessageSize(subKey2.toString()).getTotalSize()); + + //Delete second subscription and verify stats are updated accordingly + store.deleteSubscription(subKey2.getClientId(), subKey2.getSubscriptionName()); + assertEquals(stats.getMessageCount().getCount(), stats.getMessageCount(subKey.toString()).getCount()); + assertEquals(stats.getMessageSize().getTotalSize(), stats.getMessageSize(subKey.toString()).getTotalSize()); + assertTrue(stats.getMessageCount(subKey2.toString()).getCount() == 0); + assertTrue(stats.getMessageSize(subKey2.toString()).getTotalSize() == 0); + + } else { + assertTrue(stats.getMessageCount(subKey.toString()).getCount() == 0); + assertTrue(stats.getMessageSize(subKey.toString()).getTotalSize() == 0); + assertTrue(stats.getMessageCount(subKey2.toString()).getCount() == 0); + assertTrue(stats.getMessageSize(subKey2.toString()).getTotalSize() == 0); + assertEquals(0, stats.getMessageCount().getCount()); + assertEquals(0, stats.getMessageSize().getTotalSize()); + } + + } + + @Test + public void testUpdateMessageSubSize() throws Exception { + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE); + javax.jms.Topic dest = session.createTopic(defaultTopicName); + session.createDurableSubscriber(dest, "sub1"); + session.createDurableSubscriber(dest, "sub2"); + MessageProducer prod = session.createProducer(dest); + + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("SmallMessage"); + prod.send(message); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub1"); + + final Topic topic = (Topic) getBroker().getDestination(new ActiveMQTopic(defaultTopicName)); + final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey); + final DurableTopicSubscription sub2 = topic.getDurableTopicSubs().get(subKey2); + long sizeBeforeUpdate = sub.getPendingMessageSize(); + + message = (ActiveMQTextMessage) topic.getMessageStore().getMessage(message.getMessageId()); + message.setText("LargerMessageLargerMessage"); + + //update the message + topic.getMessageStore().updateMessage(message); + + //should be at least 10 bytes bigger and match the store size + assertTrue(sub.getPendingMessageSize() > sizeBeforeUpdate + 10); + assertEquals(sub.getPendingMessageSize(), topic.getMessageStore().getMessageSize()); + assertEquals(sub.getPendingMessageSize(), sub2.getPendingMessageSize()); + } + + @Test + public void testUpdateMessageSubSizeAfterConsume() throws Exception { + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE); + javax.jms.Topic dest = session.createTopic(defaultTopicName); + session.createDurableSubscriber(dest, "sub1"); + TopicSubscriber subscriber2 = session.createDurableSubscriber(dest, "sub2"); + MessageProducer prod = session.createProducer(dest); + + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("SmallMessage"); + ActiveMQTextMessage message2 = new ActiveMQTextMessage(); + message2.setText("SmallMessage2"); + prod.send(message); + prod.send(message2); + + //Receive first message for sub 2 and wait for stats to update + subscriber2.receive(); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2"); + final Topic topic = (Topic) getBroker().getDestination(new ActiveMQTopic(defaultTopicName)); + final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey); + final DurableTopicSubscription sub2 = topic.getDurableTopicSubs().get(subKey2); + + Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + return sub.getPendingMessageSize() > sub2.getPendingMessageSize(); + } + }); + + long sizeBeforeUpdate = sub.getPendingMessageSize(); + long sizeBeforeUpdate2 = sub2.getPendingMessageSize(); + + //update message 2 + message = (ActiveMQTextMessage) topic.getMessageStore().getMessage(message.getMessageId()); + message.setText("LargerMessageLargerMessage"); + + //update the message + topic.getMessageStore().updateMessage(message); + + //should be at least 10 bytes bigger and match the store size + assertTrue(sub.getPendingMessageSize() > sizeBeforeUpdate + 10); + assertEquals(sub.getPendingMessageSize(), topic.getMessageStore().getMessageSize()); + + //Sub2 only has 1 message so should be less than sub, verify that the update message + //didn't update the stats of sub2 and sub1 should be over twice as large since the + //updated message is bigger + assertTrue(sub.getPendingMessageSize() > 2 * sub2.getPendingMessageSize()); + assertEquals(sizeBeforeUpdate2, sub2.getPendingMessageSize()); + + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java index 257bbce251..985fb94821 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; @@ -29,6 +31,7 @@ import org.apache.activemq.util.SubscriptionKey; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +47,18 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor .getLogger(MemoryPendingMessageCursorTest.class); - public MemoryPendingMessageCursorTest(boolean prioritizedMessages) { - super(prioritizedMessages); - } + @Parameters(name = "prioritizedMessages={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + // use priority messages + { true }, + // don't use priority messages + { false } }); + } + + public MemoryPendingMessageCursorTest(boolean prioritizedMessages) { + super(prioritizedMessages); + } @Override protected void initPersistence(BrokerService brokerService) throws IOException { @@ -56,7 +68,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor @Override - @Test(timeout=30000) + @Test public void testMessageSizeOneDurable() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); @@ -85,7 +97,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor } @Override - @Test(timeout=30000) + @Test public void testMessageSizeTwoDurables() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); @@ -117,7 +129,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor } @Override - @Test(timeout=30000) + @Test public void testMessageSizeOneDurablePartialConsumption() throws Exception { AtomicLong publishedMessageSize = new AtomicLong(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java index 9d8755fbfc..dbc6d3c63b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java @@ -37,8 +37,9 @@ public class MultiKahaDBPendingMessageCursorTest extends /** * @param prioritizedMessages */ - public MultiKahaDBPendingMessageCursorTest(boolean prioritizedMessages) { - super(prioritizedMessages); + public MultiKahaDBPendingMessageCursorTest(final boolean prioritizedMessages, + final boolean enableSubscriptionStatistics) { + super(prioritizedMessages, enableSubscriptionStatistics); } @Override @@ -52,6 +53,7 @@ public class MultiKahaDBPendingMessageCursorTest extends KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); kahaStore.setJournalMaxFileLength(1024 * 512); + kahaStore.setEnableSubscriptionStatistics(enableSubscriptionStatistics); //set up a store per destination FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();