Adding non-blocking metrics to KahaDB to track message counts and sizes
for subscriptions.  Turned off by default but can be enabled on the
KahaDBPersistenceAdapter
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-03-18 16:31:18 +00:00
parent 2985651609
commit cf3d419528
16 changed files with 701 additions and 72 deletions

View File

@ -70,7 +70,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
batchList.addMessageFirst(node); batchList.addMessageFirst(node);
size++; size++;
node.incrementReferenceCount(); node.incrementReferenceCount();
//this.messageSize.addSize(node.getMessage().getSize());
} }
@Override @Override

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.SizeStatisticImpl;
import org.apache.activemq.management.StatsImpl;
public abstract class AbstractMessageStoreStatistics extends StatsImpl {
protected final CountStatisticImpl messageCount;
protected final SizeStatisticImpl messageSize;
protected AbstractMessageStoreStatistics(String countDescription, String sizeDescription) {
this(true, countDescription, sizeDescription);
}
protected AbstractMessageStoreStatistics(boolean enabled, String countDescription, String sizeDescription) {
messageCount = new CountStatisticImpl("messageCount", countDescription);
messageSize = new SizeStatisticImpl("messageSize", sizeDescription);
addStatistic("messageCount", messageCount);
addStatistic("messageSize", messageSize);
this.setEnabled(enabled);
}
public CountStatisticImpl getMessageCount() {
return messageCount;
}
public SizeStatisticImpl getMessageSize() {
return messageSize;
}
@Override
public void reset() {
if (this.isDoReset()) {
super.reset();
messageCount.reset();
messageSize.reset();
}
}
@Override
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
messageCount.setEnabled(enabled);
messageSize.setEnabled(enabled);
}
public void setParent(AbstractMessageStoreStatistics parent) {
if (parent != null) {
messageCount.setParent(parent.messageCount);
messageSize.setParent(parent.messageSize);
} else {
messageCount.setParent(null);
messageSize.setParent(null);
}
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.SizeStatisticImpl;
public class MessageStoreSubscriptionStatistics extends AbstractMessageStoreStatistics {
private final ConcurrentMap<String, SubscriptionStatistics> subStatistics
= new ConcurrentHashMap<>();
/**
* @param enabled
* @param countDescription
* @param sizeDescription
*/
public MessageStoreSubscriptionStatistics(boolean enabled) {
super(enabled, "The number of messages or this subscription in the message store",
"Size of messages contained by this subscription in the message store");
}
/**
* Total count for all subscriptions
*/
@Override
public CountStatisticImpl getMessageCount() {
return this.messageCount;
}
/**
* Total size for all subscriptions
*/
@Override
public SizeStatisticImpl getMessageSize() {
return this.messageSize;
}
public CountStatisticImpl getMessageCount(String subKey) {
return getOrInitStatistics(subKey).getMessageCount();
}
public SizeStatisticImpl getMessageSize(String subKey) {
return getOrInitStatistics(subKey).getMessageSize();
}
public void removeSubscription(String subKey) {
SubscriptionStatistics subStats = subStatistics.remove(subKey);
//Subtract from the parent
if (subStats != null) {
getMessageCount().subtract(subStats.getMessageCount().getCount());
getMessageSize().addSize(-subStats.getMessageSize().getTotalSize());
}
}
@Override
public void reset() {
super.reset();
subStatistics.clear();
}
private SubscriptionStatistics getOrInitStatistics(String subKey) {
SubscriptionStatistics subStats = subStatistics.get(subKey);
if (subStats == null) {
final SubscriptionStatistics stats = new SubscriptionStatistics();
subStats = subStatistics.putIfAbsent(subKey, stats);
if (subStats == null) {
subStats = stats;
}
}
return subStats;
}
private class SubscriptionStatistics extends AbstractMessageStoreStatistics {
public SubscriptionStatistics() {
this(MessageStoreSubscriptionStatistics.this.enabled);
}
/**
* @param enabled
* @param countDescription
* @param sizeDescription
*/
public SubscriptionStatistics(boolean enabled) {
super(enabled, "The number of messages or this subscription in the message store",
"Size of messages contained by this subscription in the message store");
this.setParent(MessageStoreSubscriptionStatistics.this);
}
}
}

View File

@ -233,4 +233,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
return delegate.getMessageSize(clientId, subscriberName); return delegate.getMessageSize(clientId, subscriberName);
} }
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return delegate.getMessageStoreSubStatistics();
}
} }

View File

@ -102,8 +102,25 @@ public interface TopicMessageStore extends MessageStore {
*/ */
int getMessageCount(String clientId, String subscriberName) throws IOException; int getMessageCount(String clientId, String subscriberName) throws IOException;
/**
* Get the total size of the messages ready to deliver from the store to the
* durable subscriber
*
* @param clientId
* @param subscriberName
* @return
* @throws IOException
*/
long getMessageSize(String clientId, String subscriberName) throws IOException; long getMessageSize(String clientId, String subscriberName) throws IOException;
/**
* The subscription metrics contained in this store
*
* @param subscriptionKey
* @return
*/
MessageStoreSubscriptionStatistics getMessageStoreSubStatistics();
/** /**
* Finds the subscriber entry for the given consumer info * Finds the subscriber entry for the given consumer info
* *

View File

@ -31,6 +31,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
@ -171,6 +172,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
} }
} }
//Disabled for the memory store, can be enabled later if necessary
private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return stats;
}
/** /**
* Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions
* when computing the message store statistics. * when computing the message store statistics.

View File

@ -36,6 +36,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -434,4 +435,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return result; return result;
} }
private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return stats;
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback; import org.apache.activemq.util.Callback;
@ -230,4 +231,10 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
longTermStore.resetBatching(clientId, subscriptionName); longTermStore.resetBatching(clientId, subscriptionName);
} }
private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return stats;
}
} }

View File

@ -687,6 +687,26 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setEnableAckCompaction(enableAckCompaction); letter.setEnableAckCompaction(enableAckCompaction);
} }
/**
* Whether non-blocking subscription statistics have been enabled
*
* @return
*/
public boolean isEnableSubscriptionStatistics() {
return letter.isEnableSubscriptionStatistics();
}
/**
* Enable caching statistics for each subscription to allow non-blocking
* retrieval of metrics. This could incur some overhead to compute if there are a lot
* of subscriptions.
*
* @param enableSubscriptionStatistics
*/
public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics);
}
public KahaDBStore getStore() { public KahaDBStore getStore() {
return letter; return letter;
} }

View File

@ -62,6 +62,7 @@ import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionIdTransformer;
@ -742,6 +743,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
private final AtomicInteger subscriptionCount = new AtomicInteger(); private final AtomicInteger subscriptionCount = new AtomicInteger();
protected final MessageStoreSubscriptionStatistics messageStoreSubStats =
new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics());
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination); super(destination);
this.subscriptionCount.set(getAllSubscriptions().length); this.subscriptionCount.set(getAllSubscriptions().length);
@ -750,6 +754,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
} }
@Override
protected void recoverMessageStoreStatistics() throws IOException {
super.recoverMessageStoreStatistics();
this.recoverMessageStoreSubMetrics();
}
@Override @Override
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
throws IOException { throws IOException {
@ -884,6 +894,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override @Override
public int getMessageCount(String clientId, String subscriptionName) throws IOException { public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isEnableSubscriptionStatistics()) {
return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount();
} else {
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try { try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
@ -903,11 +918,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} }
}
@Override @Override
public long getMessageSize(String clientId, String subscriptionName) throws IOException { public long getMessageSize(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isEnableSubscriptionStatistics()) {
return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize();
} else {
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try { try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
@ -927,6 +946,39 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} }
}
protected void recoverMessageStoreSubMetrics() throws IOException {
if (isEnableSubscriptionStatistics()) {
final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions
.iterator(tx); iterator.hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
String subscriptionKey = entry.getKey();
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
if (cursorPos != null) {
long size = getStoredMessageSize(tx, sd, subscriptionKey);
statistics.getMessageCount(subscriptionKey)
.setCount(getStoredMessageCount(tx, sd, subscriptionKey));
statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0);
}
}
}
});
} finally {
indexLock.writeLock().unlock();
}
}
}
@Override @Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
@ -1032,6 +1084,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return messageStoreSubStats;
}
} }
String subscriptionKey(String clientId, String subscriptionName) { String subscriptionKey(String clientId, String subscriptionName) {

View File

@ -70,6 +70,8 @@ import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
@ -282,6 +284,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean compactAcksIgnoresStoreGrowth = false; private boolean compactAcksIgnoresStoreGrowth = false;
private int checkPointCyclesWithNoGC; private int checkPointCyclesWithNoGC;
private int journalLogOnLastCompactionCheck; private int journalLogOnLastCompactionCheck;
private boolean enableSubscriptionStatistics = false;
@Override @Override
public void doStart() throws Exception { public void doStart() throws Exception {
@ -1420,7 +1423,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
addAckLocationForNewMessage(tx, sd, id); addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
} }
metadata.lastUpdate = location; metadata.lastUpdate = location;
} else { } else {
@ -1482,6 +1485,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
//Remove the existing from the size //Remove the existing from the size
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
//update all the subscription metrics
if (enableSubscriptionStatistics && location.getSize() != previousKeys.location.getSize()) {
Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
while (iter.hasNext()) {
Entry<String, SequenceSet> e = iter.next();
if (e.getValue().contains(id)) {
incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize());
decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize());
}
}
}
// on first update previous is original location, on recovery/replay it may be the updated location // on first update previous is original location, on recovery/replay it may be the updated location
if(!previousKeys.location.equals(location)) { if(!previousKeys.location.equals(location)) {
sd.locationIndex.remove(tx, previousKeys.location); sd.locationIndex.remove(tx, previousKeys.location);
@ -1622,6 +1637,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey);
sd.subscriptionCache.remove(subscriptionKey); sd.subscriptionCache.remove(subscriptionKey);
removeAckLocationsForSub(command, tx, sd, subscriptionKey); removeAckLocationsForSub(command, tx, sd, subscriptionKey);
MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination()));
if (subStats != null) {
subStats.removeSubscription(subscriptionKey);
}
if (sd.subscriptions.isEmpty(tx)) { if (sd.subscriptions.isEmpty(tx)) {
// remove the stored destination // remove the stored destination
@ -2565,10 +2584,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* @param kahaDestination * @param kahaDestination
*/ */
protected void clearStoreStats(KahaDestination kahaDestination) { protected void clearStoreStats(KahaDestination kahaDestination) {
MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination)); String key = key(kahaDestination);
MessageStoreStatistics storeStats = getStoreStats(key);
MessageStoreSubscriptionStatistics subStats = getSubStats(key);
if (storeStats != null) { if (storeStats != null) {
storeStats.reset(); storeStats.reset();
} }
if (subStats != null) {
subStats.reset();
}
} }
/** /**
@ -2605,8 +2629,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size);
}
protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) {
if (enableSubscriptionStatistics) {
MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
if (subStats != null && subKey != null) {
subStats.getMessageCount(subKey).increment();
if (size > 0) {
subStats.getMessageSize(subKey).addSize(size);
}
}
}
}
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) {
if (enableSubscriptionStatistics) {
MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey);
if (subStats != null && subKey != null) {
subStats.getMessageCount(subKey).decrement();
if (size > 0) {
subStats.getMessageSize(subKey).addSize(-size);
}
}
}
}
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) {
decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size);
}
/** /**
* This is a map to cache DestinationStatistics for a specific * This is a map to cache MessageStores for a specific
* KahaDestination key * KahaDestination key
*/ */
protected final ConcurrentMap<String, MessageStore> storeCache = protected final ConcurrentMap<String, MessageStore> storeCache =
@ -2631,6 +2688,20 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return storeStats; return storeStats;
} }
protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) {
MessageStoreSubscriptionStatistics subStats = null;
try {
MessageStore messageStore = storeCache.get(kahaDestKey);
if (messageStore instanceof TopicMessageStore) {
subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics();
}
} catch (Exception e1) {
LOG.error("Getting size counter of destination failed", e1);
}
return subStats;
}
/** /**
* Determine whether this Destination matches the DestinationType * Determine whether this Destination matches the DestinationType
* *
@ -2736,7 +2807,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
// on a new message add, all existing subs are interested in this message // on a new message add, all existing subs are interested in this message
private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest,
StoredDestination sd, Long messageSequence) throws IOException {
for(String subscriptionKey : sd.subscriptionCache) { for(String subscriptionKey : sd.subscriptionCache) {
SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
if (sequences == null) { if (sequences == null) {
@ -2748,6 +2820,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.ackPositions.put(tx, subscriptionKey, sequences); sd.ackPositions.put(tx, subscriptionKey, sequences);
} }
MessageKeys key = sd.orderIndex.get(tx, messageSequence);
incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
key.location.getSize());
Long count = sd.messageReferences.get(messageSequence); Long count = sd.messageReferences.get(messageSequence);
if (count == null) { if (count == null) {
count = Long.valueOf(0L); count = Long.valueOf(0L);
@ -2819,6 +2895,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.ackPositions.remove(tx, subscriptionKey); sd.ackPositions.remove(tx, subscriptionKey);
} }
MessageKeys key = sd.orderIndex.get(tx, messageSequence);
decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey,
key.location.getSize());
// Check if the message is reference by any other subscription. // Check if the message is reference by any other subscription.
Long count = sd.messageReferences.get(messageSequence); Long count = sd.messageReferences.get(messageSequence);
if (count != null) { if (count != null) {
@ -3786,4 +3866,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void setEnableAckCompaction(boolean enableAckCompaction) { public void setEnableAckCompaction(boolean enableAckCompaction) {
this.enableAckCompaction = enableAckCompaction; this.enableAckCompaction = enableAckCompaction;
} }
/**
* @return
*/
public boolean isEnableSubscriptionStatistics() {
return enableSubscriptionStatistics;
}
/**
* Enable caching statistics for each subscription to allow non-blocking
* retrieval of metrics. This could incur some overhead to compute if there are a lot
* of subscriptions.
*
* @param enableSubscriptionStatistics
*/
public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
this.enableSubscriptionStatistics = enableSubscriptionStatistics;
}
} }

View File

@ -46,6 +46,7 @@ import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionRecoveryListener;
@ -413,6 +414,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return 0; return 0;
} }
private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return stats;
}
@Override @Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final String subscriptionKey = subscriptionKey(clientId, subscriptionName);

View File

@ -21,8 +21,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.concurrent.TimeUnit;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection; import javax.jms.Connection;
@ -49,9 +48,11 @@ import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition; import org.apache.activemq.util.Wait.Condition;
import org.junit.After; import org.junit.After;
import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runners.Parameterized.Parameters; import org.junit.rules.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -71,22 +72,16 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
protected String defaultQueueName = "test.queue"; protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic"; protected String defaultTopicName = "test.topic";
protected static int maxMessageSize = 1000; protected static int maxMessageSize = 1000;
protected boolean prioritizedMessages; protected final boolean prioritizedMessages;
protected boolean enableSubscriptionStatistics;
@Parameters(name="prioritizedMessages={0}") @Rule
public static Collection<Object[]> data() { public Timeout globalTimeout= new Timeout(60, TimeUnit.SECONDS);
return Arrays.asList(new Object[][] {
// use priority messages
{true},
// don't use priority messages
{false}
});
}
/** /**
* @param prioritizedMessages * @param prioritizedMessages
*/ */
public AbstractPendingMessageCursorTest(boolean prioritizedMessages) { public AbstractPendingMessageCursorTest(final boolean prioritizedMessages) {
super(); super();
this.prioritizedMessages = prioritizedMessages; this.prioritizedMessages = prioritizedMessages;
} }
@ -144,6 +139,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
@Test @Test
public void testQueueMessageSize() throws Exception { public void testQueueMessageSize() throws Exception {
//doesn't apply to queues, only run once
Assume.assumeFalse(enableSubscriptionStatistics);
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
@ -153,6 +151,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
@Test @Test
public void testQueueBrowserMessageSize() throws Exception { public void testQueueBrowserMessageSize() throws Exception {
//doesn't apply to queues, only run once
Assume.assumeFalse(enableSubscriptionStatistics);
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
@ -163,6 +164,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
@Test @Test
public void testQueueMessageSizeNonPersistent() throws Exception { public void testQueueMessageSizeNonPersistent() throws Exception {
//doesn't apply to queues, only run once
Assume.assumeFalse(enableSubscriptionStatistics);
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200,
@ -172,6 +176,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
@Test @Test
public void testQueueMessageSizePersistentAndNonPersistent() throws Exception { public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
//doesn't apply to queues, only run once
Assume.assumeFalse(enableSubscriptionStatistics);
AtomicLong publishedNonPersistentMessageSize = new AtomicLong(); AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -185,6 +192,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
@Test @Test
public void testQueueMessageSizeAfterConsumption() throws Exception { public void testQueueMessageSizeAfterConsumption() throws Exception {
//doesn't apply to queues, only run once
Assume.assumeFalse(enableSubscriptionStatistics);
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
@ -198,6 +208,9 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
@Test @Test
public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception { public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
//doesn't apply to queues, only run once
Assume.assumeFalse(enableSubscriptionStatistics);
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize); org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
@ -209,7 +222,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
verifyStoreStats(dest, 0, 0); verifyStoreStats(dest, 0, 0);
} }
@Test(timeout=60000) @Test
public void testTopicMessageSize() throws Exception { public void testTopicMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -235,7 +248,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
connection.close(); connection.close();
} }
@Test(timeout=60000) @Test
public void testTopicNonPersistentMessageSize() throws Exception { public void testTopicNonPersistentMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -262,7 +275,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
connection.close(); connection.close();
} }
@Test(timeout=60000) @Test
public void testTopicPersistentAndNonPersistentMessageSize() throws Exception { public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -291,7 +304,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
connection.close(); connection.close();
} }
@Test(timeout=60000) @Test
public void testMessageSizeOneDurable() throws Exception { public void testMessageSizeOneDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
@ -320,7 +333,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
connection.close(); connection.close();
} }
@Test(timeout=60000) @Test
public void testMessageSizeOneDurablePartialConsumption() throws Exception { public void testMessageSizeOneDurablePartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -346,7 +359,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
connection.close(); connection.close();
} }
@Test(timeout=60000) @Test
public void testMessageSizeTwoDurables() throws Exception { public void testMessageSizeTwoDurables() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();

View File

@ -17,47 +17,78 @@
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* This test checks that pending message metrics work properly with KahaDB * This test checks that pending message metrics work properly with KahaDB
* *
* AMQ-5923 * AMQ-5923, AMQ-6375
* *
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class KahaDBPendingMessageCursorTest extends public class KahaDBPendingMessageCursorTest extends
AbstractPendingMessageCursorTest { AbstractPendingMessageCursorTest {
protected static final Logger LOG = LoggerFactory protected static final Logger LOG = LoggerFactory
.getLogger(KahaDBPendingMessageCursorTest.class); .getLogger(KahaDBPendingMessageCursorTest.class);
@Parameters(name = "prioritizedMessages={0},enableSubscriptionStatistics={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// use priority messages
{ true, true },
{ true, false },
// don't use priority messages
{ false, true },
{ false, false }
});
}
@Rule @Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
/** /**
* @param prioritizedMessages * @param prioritizedMessages
*/ */
public KahaDBPendingMessageCursorTest(boolean prioritizedMessages) { public KahaDBPendingMessageCursorTest(final boolean prioritizedMessages,
final boolean enableSubscriptionStatistics) {
super(prioritizedMessages); super(prioritizedMessages);
this.enableSubscriptionStatistics = enableSubscriptionStatistics;
} }
@Override @Override
@ -71,7 +102,10 @@ public class KahaDBPendingMessageCursorTest extends
protected void initPersistence(BrokerService brokerService) protected void initPersistence(BrokerService brokerService)
throws IOException { throws IOException {
broker.setPersistent(true); broker.setPersistent(true);
broker.setDataDirectoryFile(dataFileDir.getRoot()); KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(dataFileDir.getRoot());
persistenceAdapter.setEnableSubscriptionStatistics(enableSubscriptionStatistics);
broker.setPersistenceAdapter(persistenceAdapter);
} }
/** /**
@ -80,7 +114,7 @@ public class KahaDBPendingMessageCursorTest extends
* *
* @throws Exception * @throws Exception
*/ */
@Test(timeout=60000) @Test
public void testDurableMessageSizeAfterRestartAndPublish() throws Exception { public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -97,13 +131,18 @@ public class KahaDBPendingMessageCursorTest extends
verifyStoreStats(topic, 200, publishedMessageSize.get()); verifyStoreStats(topic, 200, publishedMessageSize.get());
//should be equal in this case //should be equal in this case
assertEquals(topic.getDurableTopicSubs().get(subKey).getPendingMessageSize(), long beforeRestartSize = topic.getDurableTopicSubs().get(subKey).getPendingMessageSize();
assertEquals(beforeRestartSize,
topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize()); topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
// stop, restart broker and publish more messages // stop, restart broker and publish more messages
stopBroker(); stopBroker();
this.setUpBroker(false); this.setUpBroker(false);
//verify that after restart the size is the same as before restart on recovery
topic = (Topic) getBroker().getDestination(new ActiveMQTopic(defaultTopicName));
assertEquals(beforeRestartSize, topic.getDurableTopicSubs().get(subKey).getPendingMessageSize());
connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId"); connection.setClientID("clientId");
connection.start(); connection.start();
@ -117,7 +156,7 @@ public class KahaDBPendingMessageCursorTest extends
} }
@Test(timeout=60000) @Test
public void testMessageSizeTwoDurablesPartialConsumption() throws Exception { public void testMessageSizeTwoDurablesPartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -153,7 +192,7 @@ public class KahaDBPendingMessageCursorTest extends
* *
* @throws Exception * @throws Exception
*/ */
@Test(timeout=60000) @Test
public void testNonPersistentDurableMessageSize() throws Exception { public void testNonPersistentDurableMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -170,4 +209,147 @@ public class KahaDBPendingMessageCursorTest extends
verifyStoreStats(topic, 0, 0); verifyStoreStats(topic, 0, 0);
} }
/**
* Test that the subscription counters are properly set when enabled
* and not set when disabled
*
* @throws Exception
*/
@Test
public void testEnabledSubscriptionStatistics() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
MessageStoreSubscriptionStatistics stats = store.getMessageStoreSubStatistics();
if (enableSubscriptionStatistics) {
assertTrue(stats.getMessageCount(subKey.toString()).getCount() == 200);
assertTrue(stats.getMessageSize(subKey.toString()).getTotalSize() > 0);
assertTrue(stats.getMessageCount(subKey2.toString()).getCount() == 200);
assertTrue(stats.getMessageSize(subKey2.toString()).getTotalSize() > 0);
assertEquals(stats.getMessageCount().getCount(),
stats.getMessageCount(subKey.toString()).getCount() +
stats.getMessageSize(subKey.toString()).getCount());
assertEquals(stats.getMessageSize().getTotalSize(),
stats.getMessageSize(subKey.toString()).getTotalSize() +
stats.getMessageSize(subKey2.toString()).getTotalSize());
//Delete second subscription and verify stats are updated accordingly
store.deleteSubscription(subKey2.getClientId(), subKey2.getSubscriptionName());
assertEquals(stats.getMessageCount().getCount(), stats.getMessageCount(subKey.toString()).getCount());
assertEquals(stats.getMessageSize().getTotalSize(), stats.getMessageSize(subKey.toString()).getTotalSize());
assertTrue(stats.getMessageCount(subKey2.toString()).getCount() == 0);
assertTrue(stats.getMessageSize(subKey2.toString()).getTotalSize() == 0);
} else {
assertTrue(stats.getMessageCount(subKey.toString()).getCount() == 0);
assertTrue(stats.getMessageSize(subKey.toString()).getTotalSize() == 0);
assertTrue(stats.getMessageCount(subKey2.toString()).getCount() == 0);
assertTrue(stats.getMessageSize(subKey2.toString()).getTotalSize() == 0);
assertEquals(0, stats.getMessageCount().getCount());
assertEquals(0, stats.getMessageSize().getTotalSize());
}
}
@Test
public void testUpdateMessageSubSize() throws Exception {
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
javax.jms.Topic dest = session.createTopic(defaultTopicName);
session.createDurableSubscriber(dest, "sub1");
session.createDurableSubscriber(dest, "sub2");
MessageProducer prod = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("SmallMessage");
prod.send(message);
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub1");
final Topic topic = (Topic) getBroker().getDestination(new ActiveMQTopic(defaultTopicName));
final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey);
final DurableTopicSubscription sub2 = topic.getDurableTopicSubs().get(subKey2);
long sizeBeforeUpdate = sub.getPendingMessageSize();
message = (ActiveMQTextMessage) topic.getMessageStore().getMessage(message.getMessageId());
message.setText("LargerMessageLargerMessage");
//update the message
topic.getMessageStore().updateMessage(message);
//should be at least 10 bytes bigger and match the store size
assertTrue(sub.getPendingMessageSize() > sizeBeforeUpdate + 10);
assertEquals(sub.getPendingMessageSize(), topic.getMessageStore().getMessageSize());
assertEquals(sub.getPendingMessageSize(), sub2.getPendingMessageSize());
}
@Test
public void testUpdateMessageSubSizeAfterConsume() throws Exception {
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
javax.jms.Topic dest = session.createTopic(defaultTopicName);
session.createDurableSubscriber(dest, "sub1");
TopicSubscriber subscriber2 = session.createDurableSubscriber(dest, "sub2");
MessageProducer prod = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("SmallMessage");
ActiveMQTextMessage message2 = new ActiveMQTextMessage();
message2.setText("SmallMessage2");
prod.send(message);
prod.send(message2);
//Receive first message for sub 2 and wait for stats to update
subscriber2.receive();
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
final Topic topic = (Topic) getBroker().getDestination(new ActiveMQTopic(defaultTopicName));
final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey);
final DurableTopicSubscription sub2 = topic.getDurableTopicSubs().get(subKey2);
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return sub.getPendingMessageSize() > sub2.getPendingMessageSize();
}
});
long sizeBeforeUpdate = sub.getPendingMessageSize();
long sizeBeforeUpdate2 = sub2.getPendingMessageSize();
//update message 2
message = (ActiveMQTextMessage) topic.getMessageStore().getMessage(message.getMessageId());
message.setText("LargerMessageLargerMessage");
//update the message
topic.getMessageStore().updateMessage(message);
//should be at least 10 bytes bigger and match the store size
assertTrue(sub.getPendingMessageSize() > sizeBeforeUpdate + 10);
assertEquals(sub.getPendingMessageSize(), topic.getMessageStore().getMessageSize());
//Sub2 only has 1 message so should be less than sub, verify that the update message
//didn't update the stats of sub2 and sub1 should be over twice as large since the
//updated message is bigger
assertTrue(sub.getPendingMessageSize() > 2 * sub2.getPendingMessageSize());
assertEquals(sizeBeforeUpdate2, sub2.getPendingMessageSize());
}
} }

View File

@ -17,6 +17,8 @@
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection; import javax.jms.Connection;
@ -29,6 +31,7 @@ import org.apache.activemq.util.SubscriptionKey;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -44,6 +47,15 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor
.getLogger(MemoryPendingMessageCursorTest.class); .getLogger(MemoryPendingMessageCursorTest.class);
@Parameters(name = "prioritizedMessages={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// use priority messages
{ true },
// don't use priority messages
{ false } });
}
public MemoryPendingMessageCursorTest(boolean prioritizedMessages) { public MemoryPendingMessageCursorTest(boolean prioritizedMessages) {
super(prioritizedMessages); super(prioritizedMessages);
} }
@ -56,7 +68,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor
@Override @Override
@Test(timeout=30000) @Test
public void testMessageSizeOneDurable() throws Exception { public void testMessageSizeOneDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
@ -85,7 +97,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor
} }
@Override @Override
@Test(timeout=30000) @Test
public void testMessageSizeTwoDurables() throws Exception { public void testMessageSizeTwoDurables() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();
@ -117,7 +129,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor
} }
@Override @Override
@Test(timeout=30000) @Test
public void testMessageSizeOneDurablePartialConsumption() throws Exception { public void testMessageSizeOneDurablePartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong(); AtomicLong publishedMessageSize = new AtomicLong();

View File

@ -37,8 +37,9 @@ public class MultiKahaDBPendingMessageCursorTest extends
/** /**
* @param prioritizedMessages * @param prioritizedMessages
*/ */
public MultiKahaDBPendingMessageCursorTest(boolean prioritizedMessages) { public MultiKahaDBPendingMessageCursorTest(final boolean prioritizedMessages,
super(prioritizedMessages); final boolean enableSubscriptionStatistics) {
super(prioritizedMessages, enableSubscriptionStatistics);
} }
@Override @Override
@ -52,6 +53,7 @@ public class MultiKahaDBPendingMessageCursorTest extends
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setJournalMaxFileLength(1024 * 512); kahaStore.setJournalMaxFileLength(1024 * 512);
kahaStore.setEnableSubscriptionStatistics(enableSubscriptionStatistics);
//set up a store per destination //set up a store per destination
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();