diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 3989646ee4..6fc7fbd496 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -31,6 +31,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.MessageStoreStatistics; /** * An implementation of {@link org.apache.activemq.store.MessageStore} which @@ -57,7 +58,7 @@ public class MemoryMessageStore extends AbstractMessageStore { public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { synchronized (messageTable) { messageTable.put(message.getMessageId(), message); - incMessageStoreStatistics(message); + incMessageStoreStatistics(getMessageStoreStatistics(), message); } message.incrementReferenceCount(); message.getMessageId().setFutureOrSequenceLong(sequenceId++); @@ -93,7 +94,7 @@ public class MemoryMessageStore extends AbstractMessageStore { Message removed = messageTable.remove(msgId); if( removed !=null ) { removed.decrementReferenceCount(); - decMessageStoreStatistics(removed); + decMessageStoreStatistics(getMessageStoreStatistics(), removed); } if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { lastBatchId = null; @@ -198,14 +199,18 @@ public class MemoryMessageStore extends AbstractMessageStore { } } - protected final void incMessageStoreStatistics(Message message) { - getMessageStoreStatistics().getMessageCount().increment(); - getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); + protected static final void incMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) { + if (stats != null && message != null) { + stats.getMessageCount().increment(); + stats.getMessageSize().addSize(message.getSize()); + } } - protected final void decMessageStoreStatistics(Message message) { - getMessageStoreStatistics().getMessageCount().decrement(); - getMessageStoreStatistics().getMessageSize().addSize(-message.getSize()); + protected static final void decMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) { + if (stats != null && message != null) { + stats.getMessageCount().decrement(); + stats.getMessageSize().addSize(-message.getSize()); + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index 142547f208..76199d7781 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -182,10 +182,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic @Override protected void onCacheEviction(Map.Entry eldest) { - if (messageStoreStatistics != null) { - messageStoreStatistics.getMessageCount().decrement(); - messageStoreStatistics.getMessageSize().addSize(-eldest.getValue().getSize()); - } + decMessageStoreStatistics(messageStoreStatistics, eldest.getValue()); } } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 5a36da167c..ac767a7988 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -446,6 +446,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe checkpointThread.join(); } } + //clear the cache on shutdown of the store + storeCache.clear(); } }