diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index b3bf86995d..3e51a49d4e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -38,6 +38,7 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; @@ -51,6 +52,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.store.MessageStore; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +121,12 @@ public class DestinationView implements DestinationViewMBean { return destination.getDestinationStatistics().getMessages().getCount(); } + @Override + public long getStoreMessageSize() { + MessageStore messageStore = destination.getMessageStore(); + return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0; + } + public long getMessagesCached() { return destination.getDestinationStatistics().getMessagesCached().getCount(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 60340ffe81..aedc15d8f2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -121,6 +121,14 @@ public interface DestinationViewMBean { @MBeanInfo("Number of messages in the destination which are yet to be consumed. Potentially dispatched but unacknowledged.") long getQueueSize(); + /** + * Returns the memory size of all messages in this destination's store + * + * @return Returns the memory size of all messages in this destination's store + */ + @MBeanInfo("The memory size of all messages in this destination's store.") + long getStoreMessageSize(); + /** * @return An array of all the messages in the destination's queue. */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index af61e1919e..c9823e1c22 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -375,6 +375,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index messages.setMaxProducersToAudit(getMaxProducersToAudit()); messages.setUseCache(isUseCache()); messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); + store.start(); final int messageCount = store.getMessageCount(); if (messageCount > 0 && messages.isRecoveryRequired()) { BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index bda000b696..61c62ce963 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -105,6 +105,7 @@ public class Topic extends BaseDestination implements Task { // misleading metrics. // int messageCount = store.getMessageCount(); // destinationStatistics.getMessages().setCount(messageCount); + store.start(); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index faa6c1fbcf..413f958c01 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -30,6 +30,7 @@ abstract public class AbstractMessageStore implements MessageStore { protected final ActiveMQDestination destination; protected boolean prioritizedMessages; protected IndexListener indexListener; + protected final MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics(); public AbstractMessageStore(ActiveMQDestination destination) { this.destination = destination; @@ -41,6 +42,7 @@ abstract public class AbstractMessageStore implements MessageStore { @Override public void start() throws Exception { + recoverMessageStoreStatistics(); } @Override @@ -132,4 +134,23 @@ abstract public class AbstractMessageStore implements MessageStore { static { FUTURE = new InlineListenableFuture(); } + + @Override + public int getMessageCount() throws IOException { + return (int) getMessageStoreStatistics().getMessageCount().getCount(); + } + + @Override + public long getMessageSize() throws IOException { + return getMessageStoreStatistics().getMessageSize().getTotalSize(); + } + + @Override + public MessageStoreStatistics getMessageStoreStatistics() { + return messageStoreStatistics; + } + + protected void recoverMessageStoreStatistics() throws IOException { + + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java index 4cc472e0c7..aee619a27a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java @@ -158,6 +158,18 @@ public interface MessageStore extends Service { */ int getMessageCount() throws IOException; + /** + * @return the size of the messages ready to deliver + * @throws IOException + */ + long getMessageSize() throws IOException; + + + /** + * @return The statistics bean for this message store + */ + MessageStoreStatistics getMessageStoreStatistics(); + /** * A hint to the Store to reset any batching state for the Destination * diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java new file mode 100644 index 0000000000..0a2b021610 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.SizeStatisticImpl; +import org.apache.activemq.management.StatsImpl; + +/** + * The J2EE Statistics for a Message Sore + */ +public class MessageStoreStatistics extends StatsImpl { + + protected CountStatisticImpl messageCount; + protected SizeStatisticImpl messageSize; + + + public MessageStoreStatistics() { + this(true); + } + + public MessageStoreStatistics(boolean enabled) { + + messageCount = new CountStatisticImpl("messageCount", "The number of messages in the store passing through the destination"); + messageSize = new SizeStatisticImpl("messageSize","Size of messages in the store passing through the destination"); + + addStatistic("messageCount", messageCount); + addStatistic("messageSize", messageSize); + + this.setEnabled(enabled); + } + + + public CountStatisticImpl getMessageCount() { + return messageCount; + } + + public SizeStatisticImpl getMessageSize() { + return messageSize; + } + + public void reset() { + if (this.isDoReset()) { + super.reset(); + messageCount.reset(); + messageSize.reset(); + } + } + + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + messageCount.setEnabled(enabled); + messageSize.setEnabled(enabled); + } + + public void setParent(MessageStoreStatistics parent) { + if (parent != null) { + messageCount.setParent(parent.messageCount); + messageSize.setParent(parent.messageSize); + } else { + messageCount.setParent(null); + messageSize.setParent(null); + } + } + +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index c9b2060b54..cd319a65d3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -100,6 +100,11 @@ public class ProxyMessageStore implements MessageStore { return delegate.getMessageCount(); } + @Override + public long getMessageSize() throws IOException { + return delegate.getMessageSize(); + } + @Override public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { delegate.recoverNextMessages(maxReturned, listener); @@ -169,4 +174,10 @@ public class ProxyMessageStore implements MessageStore { public String toString() { return delegate.toString(); } + + @Override + public MessageStoreStatistics getMessageStoreStatistics() { + return delegate.getMessageStoreStatistics(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 0f47f61b8f..5c591583b0 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -17,7 +17,6 @@ package org.apache.activemq.store; import java.io.IOException; -import java.util.concurrent.Future; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -145,6 +144,11 @@ public class ProxyTopicMessageStore implements TopicMessageStore { return delegate.getMessageCount(); } + @Override + public long getMessageSize() throws IOException { + return delegate.getMessageSize(); + } + @Override public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { delegate.recoverNextMessages(maxReturned, listener); @@ -213,4 +217,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public void registerIndexListener(IndexListener indexListener) { delegate.registerIndexListener(indexListener); } + + @Override + public MessageStoreStatistics getMessageStoreStatistics() { + return delegate.getMessageStoreStatistics(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 7cdaa78b0f..e71dab834d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -35,8 +35,8 @@ import org.apache.activemq.store.AbstractMessageStore; /** * An implementation of {@link org.apache.activemq.store.MessageStore} which * uses a - * - * + * + * */ public class MemoryMessageStore extends AbstractMessageStore { @@ -56,6 +56,8 @@ public class MemoryMessageStore extends AbstractMessageStore { public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { synchronized (messageTable) { messageTable.put(message.getMessageId(), message); + getMessageStoreStatistics().getMessageCount().increment(); + getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); } message.incrementReferenceCount(); message.getMessageId().setFutureOrSequenceLong(sequenceId++); @@ -93,6 +95,8 @@ public class MemoryMessageStore extends AbstractMessageStore { if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { lastBatchId = null; } + getMessageStoreStatistics().getMessageCount().decrement(); + getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize()); } } @@ -114,20 +118,17 @@ public class MemoryMessageStore extends AbstractMessageStore { public void removeAllMessages(ConnectionContext context) throws IOException { synchronized (messageTable) { messageTable.clear(); + getMessageStoreStatistics().reset(); } } public void delete() { synchronized (messageTable) { messageTable.clear(); + getMessageStoreStatistics().reset(); } } - - public int getMessageCount() { - return messageTable.size(); - } - public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { synchronized (messageTable) { boolean pastLackBatch = lastBatchId == null; @@ -161,8 +162,34 @@ public class MemoryMessageStore extends AbstractMessageStore { public void updateMessage(Message message) { synchronized (messageTable) { + Message original = messageTable.get(message.getMessageId()); + + //if can't be found then increment count, else remove old size + if (original == null) { + getMessageStoreStatistics().getMessageCount().increment(); + } else { + getMessageStoreStatistics().getMessageSize().addSize(-original.getSize()); + } messageTable.put(message.getMessageId(), message); + getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); } } - + + @Override + public void recoverMessageStoreStatistics() throws IOException { + synchronized (messageTable) { + long size = 0; + int count = 0; + for (Iterator iter = messageTable.values().iterator(); iter + .hasNext();) { + Message msg = iter.next(); + size += msg.getSize(); + } + + getMessageStoreStatistics().reset(); + getMessageStoreStatistics().getMessageCount().setCount(count); + getMessageStoreStatistics().getMessageSize().setTotalSize(size); + } + } + } diff --git a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java index 1cf0058963..e2bc033e21 100644 --- a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java +++ b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java @@ -67,6 +67,23 @@ public class SizeStatisticImpl extends StatisticImpl { } } + /** + * Reset the total size to the new value + * + * @param size + */ + public synchronized void setTotalSize(long size) { + count++; + totalSize = size; + if (size > maxSize) { + maxSize = size; + } + if (size < minSize || minSize == 0) { + minSize = size; + } + updateSampleTime(); + } + /** * @return the maximum size of any step */ diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 4674d7a6be..ac4e8cef8d 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -304,6 +304,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } + @Override public int getMessageCount() throws IOException { int result = 0; TransactionContext c = persistenceAdapter.getTransactionContext(); @@ -401,4 +402,5 @@ public class JDBCMessageStore extends AbstractMessageStore { public String toString() { return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size(); } + } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java index 2d44769a61..7ec10c4d9a 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java @@ -48,8 +48,8 @@ import org.slf4j.LoggerFactory; /** * A MessageStore that uses a Journal to store it's messages. - * - * + * + * */ public class JournalMessageStore extends AbstractMessageStore { @@ -79,7 +79,7 @@ public class JournalMessageStore extends AbstractMessageStore { this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext())); } - + public void setMemoryUsage(MemoryUsage memoryUsage) { this.memoryUsage=memoryUsage; longTermStore.setMemoryUsage(memoryUsage); @@ -323,7 +323,7 @@ public class JournalMessageStore extends AbstractMessageStore { } /** - * + * */ public Message getMessage(MessageId identity) throws IOException { Message answer = null; @@ -348,7 +348,7 @@ public class JournalMessageStore extends AbstractMessageStore { * Replays the checkpointStore first as those messages are the oldest ones, * then messages are replayed from the transaction log and then the cache is * updated. - * + * * @param listener * @throws Exception */ @@ -404,6 +404,11 @@ public class JournalMessageStore extends AbstractMessageStore { return longTermStore.getMessageCount(); } + public long getMessageSize() throws IOException { + peristenceAdapter.checkpoint(true, true); + return longTermStore.getMessageSize(); + } + public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { peristenceAdapter.checkpoint(true, true); longTermStore.recoverNextMessages(maxReturned, listener); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 8ceef360a4..44f93a622a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -61,6 +61,7 @@ import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.ListenableFuture; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; @@ -503,34 +504,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return loadMessage(location); } - @Override - public int getMessageCount() throws IOException { - try { - lockAsyncJobQueue(); - indexLock.writeLock().lock(); - try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Integer execute(Transaction tx) throws IOException { - // Iterate through all index entries to get a count - // of messages in the destination. - StoredDestination sd = getStoredDestination(dest, tx); - int rc = 0; - for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { - iterator.next(); - rc++; - } - return rc; - } - }); - } finally { - indexLock.writeLock().unlock(); - } - } finally { - unlockAsyncJobQueue(); - } - } - @Override public boolean isEmpty() throws IOException { indexLock.writeLock().lock(); @@ -716,6 +689,38 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public String toString(){ return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); } + + @Override + protected void recoverMessageStoreStatistics() throws IOException { + try { + MessageStoreStatistics recoveredStatistics; + lockAsyncJobQueue(); + indexLock.writeLock().lock(); + try { + recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure() { + @Override + public MessageStoreStatistics execute(Transaction tx) throws IOException { + MessageStoreStatistics statistics = new MessageStoreStatistics(); + + // Iterate through all index entries to get the size of each message + StoredDestination sd = getStoredDestination(dest, tx); + for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { + int locationSize = iterator.next().getKey().getSize(); + statistics.getMessageCount().increment(); + statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); + } + return statistics; + } + }); + getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); + getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); + } finally { + indexLock.writeLock().unlock(); + } + } finally { + unlockAsyncJobQueue(); + } + } } class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { @@ -993,12 +998,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - return this.transactionStore.proxy(new KahaDBMessageStore(destination)); + MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination)); + storeCache.put(key(convert(destination)), store); + return store; } @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { - return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); + TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); + storeCache.put(key(convert(destination)), store); + return store; } /** diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index ef8fe0a396..e35619e4f4 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -53,10 +54,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; @@ -113,7 +119,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe static final int OPEN_STATE = 2; static final long NOT_ACKED = -1; - static final int VERSION = 5; + static final int VERSION = 6; protected class Metadata { protected Page page; @@ -738,7 +744,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe long undoCounter=0; // Go through all the destinations to see if they have messages past the lastAppendLocation - for (StoredDestination sd : storedDestinations.values()) { + for (String key : storedDestinations.keySet()) { + StoredDestination sd = storedDestinations.get(key); final ArrayList matches = new ArrayList(); // Find all the Locations that are >= than the last Append Location. @@ -755,6 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.messageIdIndex.remove(tx, keys.messageId); metadata.producerSequenceIdTracker.rollback(keys.messageId); undoCounter++; + decrementAndSubSizeToStoreStat(key, keys.location.getSize()); // TODO: do we need to modify the ack positions for the pub sub case? } } @@ -858,6 +866,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.messageIdIndex.remove(tx, keys.messageId); LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); undoCounter++; + decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize()); // TODO: do we need to modify the ack positions for the pub sub case? } } else { @@ -1312,6 +1321,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (previous == null) { previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); if (previous == null) { + incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { addAckLocationForNewMessage(tx, sd, id); @@ -1337,7 +1347,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } // record this id in any event, initial send or recovery metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); - return id; + + return id; } void trackPendingAdd(KahaDestination destination, Long seq) { @@ -1367,9 +1378,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe new MessageKeys(command.getMessageId(), location) ); sd.locationIndex.put(tx, location, id); + incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); // on first update previous is original location, on recovery/replay it may be the updated location if(previousKeys != null && !previousKeys.location.equals(location)) { sd.locationIndex.remove(tx, previousKeys.location); + decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); } metadata.lastUpdate = location; } else { @@ -1387,6 +1400,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); if (keys != null) { sd.locationIndex.remove(tx, keys.location); + decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize()); recordAckMessageReferenceLocation(ackLocation, keys.location); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { @@ -1414,7 +1428,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe recordAckMessageReferenceLocation(ackLocation, keys.location); } // The following method handles deleting un-referenced messages. - removeAckLocation(tx, sd, subscriptionKey, sequence); + removeAckLocation(command, tx, sd, subscriptionKey, sequence); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); @@ -1470,6 +1484,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe String key = key(command.getDestination()); storedDestinations.remove(key); metadata.destinations.remove(tx, key); + clearStoreStats(command.getDestination()); + storeCache.remove(key(command.getDestination())); } void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { @@ -1494,13 +1510,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.subLocations.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionCache.remove(subscriptionKey); - removeAckLocationsForSub(tx, sd, subscriptionKey); + removeAckLocationsForSub(command, tx, sd, subscriptionKey); if (sd.subscriptions.isEmpty(tx)) { // remove the stored destination KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); removeDestinationCommand.setDestination(command.getDestination()); updateIndex(tx, removeDestinationCommand, null); + clearStoreStats(command.getDestination()); } } } @@ -1879,6 +1896,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + class StoredDestination { MessageOrderIndex orderIndex = new MessageOrderIndex(); @@ -1912,6 +1930,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected class StoredDestinationMarshaller extends VariableMarshaller { + final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); + @Override public StoredDestination readPayload(final DataInput dataIn) throws IOException { final StoredDestination value = new StoredDestination(); @@ -1996,12 +2016,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void execute(Transaction tx) throws IOException { value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); value.orderIndex.lowPriorityIndex.load(tx); value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); value.orderIndex.highPriorityIndex.load(tx); } }); @@ -2100,7 +2120,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Figure out the next key using the last entry in the destination. rc.orderIndex.configureLast(tx); - rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE); + rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller()); rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); rc.locationIndex.load(tx); @@ -2202,6 +2222,133 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return rc; } + /** + * Clear the counter for the destination, if one exists. + * + * @param kahaDestination + */ + protected void clearStoreStats(KahaDestination kahaDestination) { + MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination)); + if (storeStats != null) { + storeStats.reset(); + } + } + + /** + * Update MessageStoreStatistics + * + * @param kahaDestination + * @param size + */ + protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) { + incrementAndAddSizeToStoreStat(key(kahaDestination), size); + } + + protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) { + MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); + if (storeStats != null) { + storeStats.getMessageCount().increment(); + if (size > 0) { + storeStats.getMessageSize().addSize(size); + } + } + } + + protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) { + decrementAndSubSizeToStoreStat(key(kahaDestination), size); + } + + protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) { + MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); + if (storeStats != null) { + storeStats.getMessageCount().decrement(); + if (size > 0) { + storeStats.getMessageSize().addSize(-size); + } + } + } + + /** + * This is a map to cache DestinationStatistics for a specific + * KahaDestination key + */ + protected final Map storeCache = + new ConcurrentHashMap(); + + /** + * Locate the storeMessageSize counter for this KahaDestination + * @param kahaDestination + * @return + */ + protected MessageStoreStatistics getStoreStats(String kahaDestKey) { + MessageStoreStatistics storeStats = null; + try { + MessageStore messageStore = storeCache.get(kahaDestKey); + if (messageStore != null) { + storeStats = messageStore.getMessageStoreStatistics(); + } + } catch (Exception e1) { + LOG.error("Getting size counter of destination failed", e1); + } + + return storeStats; + } + + /** + * Determine whether this Destination matches the DestinationType + * + * @param destination + * @param type + * @return + */ + protected boolean matchType(Destination destination, + KahaDestination.DestinationType type) { + if (destination instanceof Topic + && type.equals(KahaDestination.DestinationType.TOPIC)) { + return true; + } else if (destination instanceof Queue + && type.equals(KahaDestination.DestinationType.QUEUE)) { + return true; + } + return false; + } + + class LocationSizeMarshaller implements Marshaller { + + public LocationSizeMarshaller() { + + } + + public Location readPayload(DataInput dataIn) throws IOException { + Location rc = new Location(); + rc.setDataFileId(dataIn.readInt()); + rc.setOffset(dataIn.readInt()); + if (metadata.version >= 6) { + rc.setSize(dataIn.readInt()); + } + return rc; + } + + public void writePayload(Location object, DataOutput dataOut) + throws IOException { + dataOut.writeInt(object.getDataFileId()); + dataOut.writeInt(object.getOffset()); + dataOut.writeInt(object.getSize()); + } + + public int getFixedSize() { + return 12; + } + + public Location deepCopy(Location source) { + return new Location(source); + } + + public boolean isDeepCopySupported() { + return true; + } + } + private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); if (sequences == null) { @@ -2269,7 +2416,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + private void removeAckLocationsForSub(KahaSubscriptionCommand command, + Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { if (!sd.ackPositions.isEmpty(tx)) { SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); if (sequences == null || sequences.isEmpty()) { @@ -2302,6 +2450,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.locationIndex.remove(tx, entry.getValue().location); sd.messageIdIndex.remove(tx, entry.getValue().messageId); sd.orderIndex.remove(tx, entry.getKey()); + decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); } } } @@ -2314,7 +2463,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @param messageSequence * @throws IOException */ - private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { + private void removeAckLocation(KahaRemoveMessageCommand command, + Transaction tx, StoredDestination sd, String subscriptionKey, + Long messageSequence) throws IOException { // Remove the sub from the previous location set.. if (messageSequence != null) { SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); @@ -2347,6 +2498,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.locationIndex.remove(tx, entry.getValue().location); sd.messageIdIndex.remove(tx, entry.getValue().messageId); sd.orderIndex.remove(tx, entry.getKey()); + decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); } } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 45e35c6f07..04d74b6357 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -197,25 +197,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA return msg; } - @Override - public int getMessageCount() throws IOException { - synchronized(indexMutex) { - return pageFile.tx().execute(new Transaction.CallableClosure(){ - @Override - public Integer execute(Transaction tx) throws IOException { - // Iterate through all index entries to get a count of messages in the destination. - StoredDestination sd = getStoredDestination(dest, tx); - int rc=0; - for (Iterator> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) { - iterator.next(); - rc++; - } - return rc; - } - }); - } - } - @Override public void recover(final MessageRecoveryListener listener) throws Exception { synchronized(indexMutex) { @@ -297,6 +278,27 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA public void stop() throws Exception { } + @Override + public void recoverMessageStoreStatistics() throws IOException { + int count = 0; + synchronized(indexMutex) { + count = pageFile.tx().execute(new Transaction.CallableClosure(){ + @Override + public Integer execute(Transaction tx) throws IOException { + // Iterate through all index entries to get a count of messages in the destination. + StoredDestination sd = getStoredDestination(dest, tx); + int rc=0; + for (Iterator> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) { + iterator.next(); + rc++; + } + return rc; + } + }); + } + getMessageStoreStatistics().getMessageCount().setCount(count); + } + } class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java index 7826a0ba8b..e859f9c672 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java @@ -22,8 +22,13 @@ import java.io.IOException; import org.apache.activemq.store.kahadb.disk.journal.Location; public class LocationMarshaller implements Marshaller { + public final static LocationMarshaller INSTANCE = new LocationMarshaller(); + public LocationMarshaller () { + + } + public Location readPayload(DataInput dataIn) throws IOException { Location rc = new Location(); rc.setDataFileId(dataIn.readInt()); diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 49e8cfab51..7c2d327519 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -834,7 +834,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P cursorPosition = cursorResetPosition } - def getMessageCount: Int = { + override def getMessageCount: Int = { return db.collectionSize(key).toInt } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java index f8fab10a13..90b8428640 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -490,11 +490,6 @@ public class StoreQueueCursorOrderTest { } - @Override - public int getMessageCount() throws IOException { - return 0; - } - @Override public void resetBatching() { @@ -513,5 +508,10 @@ public class StoreQueueCursorOrderTest { batch.incrementAndGet(); } + @Override + public void recoverMessageStoreStatistics() throws IOException { + this.getMessageStoreStatistics().reset(); + } + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java new file mode 100644 index 0000000000..59ae44bcec --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test checks that KahaDB properly sets the new storeMessageSize statistic. + * + * AMQ-5748 + * + */ +public abstract class AbstractMessageStoreSizeStatTest { + protected static final Logger LOG = LoggerFactory + .getLogger(AbstractMessageStoreSizeStatTest.class); + + + protected BrokerService broker; + protected URI brokerConnectURI; + protected String defaultQueueName = "test.queue"; + protected static int messageSize = 1000; + + @Before + public void startBroker() throws Exception { + setUpBroker(true); + } + + protected void setUpBroker(boolean clearDataDir) throws Exception { + + broker = new BrokerService(); + this.initPersistence(broker); + //set up a transport + TransportConnector connector = broker + .addConnector(new TransportConnector()); + connector.setUri(new URI("tcp://0.0.0.0:0")); + connector.setName("tcp"); + + broker.start(); + broker.waitUntilStarted(); + brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri(); + + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + protected abstract void initPersistence(BrokerService brokerService) throws IOException; + + @Test + public void testMessageSize() throws Exception { + Destination dest = publishTestMessages(200); + verifyStats(dest, 200, 200 * messageSize); + } + + @Test + public void testMessageSizeAfterConsumption() throws Exception { + + Destination dest = publishTestMessages(200); + verifyStats(dest, 200, 200 * messageSize); + + consumeTestMessages(); + Thread.sleep(3000); + verifyStats(dest, 0, 0); + } + + @Test + public void testMessageSizeDurable() throws Exception { + + Destination dest = publishTestMessagesDurable(); + + //verify the count and size + verifyStats(dest, 200, 200 * messageSize); + + } + + @Test + public void testMessageSizeAfterDestinationDeletion() throws Exception { + Destination dest = publishTestMessages(200); + verifyStats(dest, 200, 200 * messageSize); + + //check that the size is 0 after deletion + broker.removeDestination(dest.getActiveMQDestination()); + verifyStats(dest, 0, 0); + } + + protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception { + MessageStore messageStore = dest.getMessageStore(); + MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics(); + assertEquals(messageStore.getMessageCount(), count); + assertEquals(messageStore.getMessageCount(), + storeStats.getMessageCount().getCount()); + assertEquals(messageStore.getMessageSize(), + messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize()); + if (count > 0) { + assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize); + } else { + assertEquals(storeStats.getMessageSize().getTotalSize(), 0); + } + } + + /** + * Generate random 1 megabyte messages + * @param session + * @return + * @throws JMSException + */ + protected BytesMessage createMessage(Session session) throws JMSException { + final BytesMessage message = session.createBytesMessage(); + final byte[] data = new byte[messageSize]; + final Random rng = new Random(); + rng.nextBytes(data); + message.writeBytes(data); + return message; + } + + + protected Destination publishTestMessages(int count) throws Exception { + return publishTestMessages(count, defaultQueueName); + } + + protected Destination publishTestMessages(int count, String queueName) throws Exception { + // create a new queue + final ActiveMQDestination activeMqQueue = new ActiveMQQueue( + queueName); + + Destination dest = broker.getDestination(activeMqQueue); + + // Start the connection + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI) + .createConnection(); + connection.setClientID("clientId" + queueName); + connection.start(); + Session session = connection.createSession(false, + QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + try { + // publish a bunch of non-persistent messages to fill up the temp + // store + MessageProducer prod = session.createProducer(queue); + prod.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < count; i++) { + prod.send(createMessage(session)); + } + + } finally { + connection.stop(); + } + + return dest; + } + + protected Destination consumeTestMessages() throws Exception { + return consumeTestMessages(defaultQueueName); + } + + protected Destination consumeTestMessages(String queueName) throws Exception { + // create a new queue + final ActiveMQDestination activeMqQueue = new ActiveMQQueue( + queueName); + + Destination dest = broker.getDestination(activeMqQueue); + + // Start the connection + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI) + .createConnection(); + connection.setClientID("clientId2" + queueName); + connection.start(); + Session session = connection.createSession(false, + QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + try { + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 200; i++) { + consumer.receive(); + } + + } finally { + connection.stop(); + } + + return dest; + } + + protected Destination publishTestMessagesDurable() throws Exception { + // create a new queue + final ActiveMQDestination activeMqTopic = new ActiveMQTopic( + "test.topic"); + + Destination dest = broker.getDestination(activeMqTopic); + + // Start the connection + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI) + .createConnection(); + connection.setClientID("clientId"); + connection.start(); + Session session = connection.createSession(false, + TopicSession.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("test.topic"); + session.createDurableSubscriber(topic, "sub1"); + + try { + // publish a bunch of non-persistent messages to fill up the temp + // store + MessageProducer prod = session.createProducer(topic); + prod.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 200; i++) { + prod.send(createMessage(session)); + } + + } finally { + connection.stop(); + } + + return dest; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java new file mode 100644 index 0000000000..923bc82c4d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import static org.junit.Assert.assertTrue; + +import java.util.Random; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IdGenerator; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly + * compute the size of the messages in the store. + * + */ +public abstract class AbstractMessageStoreSizeTest { + + protected static final IdGenerator id = new IdGenerator(); + protected ActiveMQQueue destination = new ActiveMQQueue("Test"); + protected ProducerId producerId = new ProducerId("1.1.1"); + protected static final int MESSAGE_COUNT = 20; + protected static String dataDirectory = "target/test-amq-5748/datadb"; + protected static int testMessageSize = 1000; + + @Before + public void init() throws Exception { + this.initStore(); + } + + @After + public void destroy() throws Exception { + this.destroyStore(); + } + + protected abstract void initStore() throws Exception; + + + protected abstract void destroyStore() throws Exception; + + + /** + * This method tests that the message size exists after writing a bunch of messages to the store. + * @throws Exception + */ + @Test + public void testMessageSize() throws Exception { + writeMessages(); + long messageSize = getMessageStore().getMessageSize(); + assertTrue(getMessageStore().getMessageCount() == 20); + assertTrue(messageSize > 20 * testMessageSize); + } + + + /** + * Write random byte messages to the store for testing. + * + * @throws Exception + */ + protected void writeMessages() throws Exception { + final ConnectionContext context = new ConnectionContext(); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + ActiveMQMessage message = new ActiveMQMessage(); + final byte[] data = new byte[testMessageSize]; + final Random rng = new Random(); + rng.nextBytes(data); + message.setContent(new ByteSequence(data)); + message.setDestination(destination); + message.setMessageId(new MessageId(id.generateId() + ":1", i)); + getMessageStore().addMessage(context, message); + } + } + + protected abstract MessageStore getMessageStore(); +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java new file mode 100644 index 0000000000..7d53cbd0f0 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; + +import org.apache.activemq.store.AbstractMessageStoreSizeTest; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.junit.Test; + +/** + * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly + * compute the size of the messages in the store. + * + * For KahaDB specifically, the size was not being stored in in the index ({@link LocationMarshaller}). LocationMarshaller + * has been updated to include an option to include the size in the serialized value. This way the message + * size will be persisted in the index and be available between broker restarts without needing to rebuild the index. + * Note that the KahaDB version has been incremented from 5 to 6 because the index will need to be rebuild when a version + * 5 index is detected since it will be detected as corrupt. + * + */ +public abstract class AbstractKahaDBMessageStoreSizeTest extends AbstractMessageStoreSizeTest { + + MessageStore messageStore; + PersistenceAdapter store; + + @Override + public void initStore() throws Exception { + createStore(true, dataDirectory); + } + + abstract protected void createStore(boolean deleteAllMessages, String directory) throws Exception; + + abstract protected String getVersion5Dir(); + + @Override + public void destroyStore() throws Exception { + if (store != null) { + store.stop(); + } + } + + + /** + * This method tests that the message sizes exist for all messages that exist after messages are recovered + * off of disk. + * + * @throws Exception + */ + @Test + public void testMessageSizeStoreRecovery() throws Exception { + writeMessages(); + store.stop(); + + createStore(false, dataDirectory); + writeMessages(); + long messageSize = messageStore.getMessageSize(); + assertEquals(40, messageStore.getMessageCount()); + assertTrue(messageSize > 40 * testMessageSize); + } + + /** + * This method tests that a version 5 store with an old index still works but returns 0 for messgage sizes. + * + * @throws Exception + */ + @Test + public void testMessageSizeStoreRecoveryVersion5() throws Exception { + store.stop(); + + //Copy over an existing version 5 store with messages + File dataDir = new File(dataDirectory); + if (dataDir.exists()) + FileUtils.deleteDirectory(new File(dataDirectory)); + FileUtils.copyDirectory(new File(getVersion5Dir()), + dataDir); + + //reload store + createStore(false, dataDirectory); + + //make sure size is 0 + long messageSize = messageStore.getMessageSize(); + assertTrue(messageStore.getMessageCount() == 20); + assertTrue(messageSize == 0); + + + } + + /** + * This method tests that a version 5 store with existing messages will correctly be recovered and converted + * to version 6. After index deletion, the index will be rebuilt and will include message sizes. + * + * @throws Exception + */ + @Test + public void testMessageSizeStoreRecoveryVersion5RebuildIndex() throws Exception { + store.stop(); + + //Copy over an existing version 5 store with messages + File dataDir = new File(dataDirectory); + if (dataDir.exists()) + FileUtils.deleteDirectory(new File(dataDirectory)); + FileUtils.copyDirectory(new File(getVersion5Dir()), + dataDir); + for (File index : FileUtils.listFiles(new File(dataDirectory), new WildcardFileFilter("*.data"), TrueFileFilter.INSTANCE)) { + FileUtils.deleteQuietly(index); + } + + //append more messages...at this point the index should be rebuilt + createStore(false, dataDirectory); + writeMessages(); + + //after writing new messages to the existing store, make sure the index is rebuilt and size is correct + long messageSize = messageStore.getMessageSize(); + assertTrue(messageStore.getMessageCount() == 40); + assertTrue(messageSize > 40 * testMessageSize); + + } + + @Override + protected MessageStore getMessageStore() { + return messageStore; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java new file mode 100644 index 0000000000..bb46f20e55 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.io.IOException; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; +import org.apache.commons.io.FileUtils; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test checks that KahaDB properly sets the new storeMessageSize + * statistic. + * + * AMQ-5748 + * + */ +public class KahaDBMessageStoreSizeStatTest extends + AbstractMessageStoreSizeStatTest { + protected static final Logger LOG = LoggerFactory + .getLogger(KahaDBMessageStoreSizeStatTest.class); + + File dataFileDir = new File("target/test-amq-5748/stat-datadb"); + + @Override + protected void setUpBroker(boolean clearDataDir) throws Exception { + if (clearDataDir && dataFileDir.exists()) + FileUtils.cleanDirectory(dataFileDir); + super.setUpBroker(clearDataDir); + } + + @Override + protected void initPersistence(BrokerService brokerService) + throws IOException { + broker.setPersistent(true); + broker.setDataDirectoryFile(dataFileDir); + } + + /** + * Test that the the counter restores size and works after restart and more + * messages are published + * + * @throws Exception + */ + @Test + public void testMessageSizeAfterRestartAndPublish() throws Exception { + + Destination dest = publishTestMessages(200); + + // verify the count and size + verifyStats(dest, 200, 200 * messageSize); + + // stop, restart broker and publish more messages + stopBroker(); + this.setUpBroker(false); + dest = publishTestMessages(200); + + // verify the count and size + verifyStats(dest, 400, 400 * messageSize); + + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java new file mode 100644 index 0000000000..43dc2f6f2f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; + +import org.apache.activemq.store.MessageStore; + +/** + * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly + * compute the size of the messages in the KahaDB Store. + * + */ +public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest { + + @Override + protected void createStore(boolean deleteAllMessages, String directory) throws Exception { + KahaDBStore kahaDBStore = new KahaDBStore(); + store = kahaDBStore; + kahaDBStore.setJournalMaxFileLength(1024 * 512); + kahaDBStore.setDeleteAllMessages(deleteAllMessages); + kahaDBStore.setDirectory(new File(directory)); + kahaDBStore.start(); + messageStore = store.createQueueMessageStore(destination); + messageStore.start(); + } + + @Override + protected String getVersion5Dir() { + return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5"; + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java new file mode 100644 index 0000000000..4342e1d529 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; +import org.apache.commons.io.FileUtils; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test checks that KahaDB properly sets the new storeMessageSize + * statistic. + * + * AMQ-5748 + * + */ +public class MultiKahaDBMessageStoreSizeStatTest extends + AbstractMessageStoreSizeStatTest { + protected static final Logger LOG = LoggerFactory + .getLogger(MultiKahaDBMessageStoreSizeStatTest.class); + + File dataFileDir = new File("target/test-amq-5748/stat-datadb"); + + @Override + protected void setUpBroker(boolean clearDataDir) throws Exception { + if (clearDataDir && dataFileDir.exists()) + FileUtils.cleanDirectory(dataFileDir); + super.setUpBroker(clearDataDir); + } + + @Override + protected void initPersistence(BrokerService brokerService) + throws IOException { + broker.setPersistent(true); + + //setup multi-kaha adapter + MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(dataFileDir); + + KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); + kahaStore.setJournalMaxFileLength(1024 * 512); + + //set up a store per destination + FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); + filtered.setPersistenceAdapter(kahaStore); + filtered.setPerDestination(true); + List stores = new ArrayList<>(); + stores.add(filtered); + + persistenceAdapter.setFilteredPersistenceAdapters(stores); + broker.setPersistenceAdapter(persistenceAdapter); + } + + /** + * Test that the the counter restores size and works after restart and more + * messages are published + * + * @throws Exception + */ + @Test + public void testMessageSizeAfterRestartAndPublish() throws Exception { + + Destination dest = publishTestMessages(200); + + // verify the count and size + verifyStats(dest, 200, 200 * messageSize); + + // stop, restart broker and publish more messages + stopBroker(); + this.setUpBroker(false); + dest = publishTestMessages(200); + + // verify the count and size + verifyStats(dest, 400, 400 * messageSize); + + } + + @Test + public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception { + + Destination dest = publishTestMessages(200); + + // verify the count and size + verifyStats(dest, 200, 200 * messageSize); + assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize); + + Destination dest2 = publishTestMessages(200, "test.queue2"); + + // verify the count and size + verifyStats(dest2, 200, 200 * messageSize); + assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize); + + // stop, restart broker and publish more messages + stopBroker(); + this.setUpBroker(false); + dest = publishTestMessages(200); + dest2 = publishTestMessages(200, "test.queue2"); + + // verify the count and size after publishing messages + verifyStats(dest, 400, 400 * messageSize); + verifyStats(dest2, 400, 400 * messageSize); + + System.out.println(broker.getPersistenceAdapter().size()); + assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize); + assertTrue(broker.getPersistenceAdapter().size() >= + (dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize())); + + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java new file mode 100644 index 0000000000..398b2f730b --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.store.MessageStore; +import org.apache.commons.io.FileUtils; + +/** + * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly + * compute the size of the messages in the store. + * + * + */ +public class MultiKahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest { + + + @Override + protected void createStore(boolean deleteAllMessages, String directory) throws Exception { + MultiKahaDBPersistenceAdapter multiStore = new MultiKahaDBPersistenceAdapter(); + + store = multiStore; + File fileDir = new File(directory); + + if (deleteAllMessages && fileDir.exists()) { + FileUtils.cleanDirectory(new File(directory)); + } + + KahaDBPersistenceAdapter localStore = new KahaDBPersistenceAdapter(); + localStore.setJournalMaxFileLength(1024 * 512); + localStore.setDirectory(new File(directory)); + + FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); + filtered.setPersistenceAdapter(localStore); + filtered.setPerDestination(true); + List stores = new ArrayList<>(); + stores.add(filtered); + + multiStore.setFilteredPersistenceAdapters(stores); + multiStore.setDirectory(fileDir); + multiStore.start(); + messageStore = store.createQueueMessageStore(destination); + messageStore.start(); + } + + @Override + protected String getVersion5Dir() { + return "src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5"; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java new file mode 100644 index 0000000000..755936ca69 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.memory; + +import java.io.IOException; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test checks that KahaDB properly sets the new storeMessageSize statistic. + * + * AMQ-5748 + * + */ +public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest { + protected static final Logger LOG = LoggerFactory + .getLogger(MemoryMessageStoreSizeStatTest.class); + + @Override + protected void initPersistence(BrokerService brokerService) throws IOException { + broker.setPersistent(false); + broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); + } + + + + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java new file mode 100644 index 0000000000..19a01ab77d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.memory; + +import org.apache.activemq.store.AbstractMessageStoreSizeTest; +import org.apache.activemq.store.MessageStore; + +public class MemoryMessageStoreSizeTest extends AbstractMessageStoreSizeTest { + + MemoryMessageStore messageStore; + + @Override + public void initStore() throws Exception { + messageStore = new MemoryMessageStore(destination); + messageStore.start(); + } + + + @Override + public void destroyStore() throws Exception { + if (messageStore != null) { + messageStore.stop(); + } + } + + @Override + protected MessageStore getMessageStore() { + return messageStore; + } + +} diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log new file mode 100644 index 0000000000..18f34cf19d Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log differ diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data new file mode 100644 index 0000000000..8854b717db Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data differ diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo new file mode 100644 index 0000000000..eaa3d6f245 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo differ diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log new file mode 100644 index 0000000000..60d259be66 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log differ diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data new file mode 100644 index 0000000000..721ec11c76 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data differ diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo new file mode 100644 index 0000000000..e7fe129498 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo differ