This closes #92

This commit is contained in:
Timothy Bish 2015-07-07 16:17:14 -04:00
commit 2b320ac065
34 changed files with 1382 additions and 79 deletions

View File

@ -38,6 +38,7 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
@ -51,6 +52,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -119,6 +121,12 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getMessages().getCount();
}
@Override
public long getStoreMessageSize() {
MessageStore messageStore = destination.getMessageStore();
return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0;
}
public long getMessagesCached() {
return destination.getDestinationStatistics().getMessagesCached().getCount();
}

View File

@ -121,6 +121,14 @@ public interface DestinationViewMBean {
@MBeanInfo("Number of messages in the destination which are yet to be consumed. Potentially dispatched but unacknowledged.")
long getQueueSize();
/**
* Returns the memory size of all messages in this destination's store
*
* @return Returns the memory size of all messages in this destination's store
*/
@MBeanInfo("The memory size of all messages in this destination's store.")
long getStoreMessageSize();
/**
* @return An array of all the messages in the destination's queue.
*/

View File

@ -375,6 +375,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
messages.setMaxProducersToAudit(getMaxProducersToAudit());
messages.setUseCache(isUseCache());
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
store.start();
final int messageCount = store.getMessageCount();
if (messageCount > 0 && messages.isRecoveryRequired()) {
BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);

View File

@ -105,6 +105,7 @@ public class Topic extends BaseDestination implements Task {
// misleading metrics.
// int messageCount = store.getMessageCount();
// destinationStatistics.getMessages().setCount(messageCount);
store.start();
}
}

View File

@ -30,6 +30,7 @@ abstract public class AbstractMessageStore implements MessageStore {
protected final ActiveMQDestination destination;
protected boolean prioritizedMessages;
protected IndexListener indexListener;
protected final MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
public AbstractMessageStore(ActiveMQDestination destination) {
this.destination = destination;
@ -41,6 +42,7 @@ abstract public class AbstractMessageStore implements MessageStore {
@Override
public void start() throws Exception {
recoverMessageStoreStatistics();
}
@Override
@ -132,4 +134,23 @@ abstract public class AbstractMessageStore implements MessageStore {
static {
FUTURE = new InlineListenableFuture();
}
@Override
public int getMessageCount() throws IOException {
return (int) getMessageStoreStatistics().getMessageCount().getCount();
}
@Override
public long getMessageSize() throws IOException {
return getMessageStoreStatistics().getMessageSize().getTotalSize();
}
@Override
public MessageStoreStatistics getMessageStoreStatistics() {
return messageStoreStatistics;
}
protected void recoverMessageStoreStatistics() throws IOException {
}
}

View File

@ -158,6 +158,18 @@ public interface MessageStore extends Service {
*/
int getMessageCount() throws IOException;
/**
* @return the size of the messages ready to deliver
* @throws IOException
*/
long getMessageSize() throws IOException;
/**
* @return The statistics bean for this message store
*/
MessageStoreStatistics getMessageStoreStatistics();
/**
* A hint to the Store to reset any batching state for the Destination
*

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.SizeStatisticImpl;
import org.apache.activemq.management.StatsImpl;
/**
* The J2EE Statistics for a Message Sore
*/
public class MessageStoreStatistics extends StatsImpl {
protected CountStatisticImpl messageCount;
protected SizeStatisticImpl messageSize;
public MessageStoreStatistics() {
this(true);
}
public MessageStoreStatistics(boolean enabled) {
messageCount = new CountStatisticImpl("messageCount", "The number of messages in the store passing through the destination");
messageSize = new SizeStatisticImpl("messageSize","Size of messages in the store passing through the destination");
addStatistic("messageCount", messageCount);
addStatistic("messageSize", messageSize);
this.setEnabled(enabled);
}
public CountStatisticImpl getMessageCount() {
return messageCount;
}
public SizeStatisticImpl getMessageSize() {
return messageSize;
}
public void reset() {
if (this.isDoReset()) {
super.reset();
messageCount.reset();
messageSize.reset();
}
}
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
messageCount.setEnabled(enabled);
messageSize.setEnabled(enabled);
}
public void setParent(MessageStoreStatistics parent) {
if (parent != null) {
messageCount.setParent(parent.messageCount);
messageSize.setParent(parent.messageSize);
} else {
messageCount.setParent(null);
messageSize.setParent(null);
}
}
}

View File

@ -100,6 +100,11 @@ public class ProxyMessageStore implements MessageStore {
return delegate.getMessageCount();
}
@Override
public long getMessageSize() throws IOException {
return delegate.getMessageSize();
}
@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
@ -169,4 +174,10 @@ public class ProxyMessageStore implements MessageStore {
public String toString() {
return delegate.toString();
}
@Override
public MessageStoreStatistics getMessageStoreStatistics() {
return delegate.getMessageStoreStatistics();
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -145,6 +144,11 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
return delegate.getMessageCount();
}
@Override
public long getMessageSize() throws IOException {
return delegate.getMessageSize();
}
@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
@ -213,4 +217,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public void registerIndexListener(IndexListener indexListener) {
delegate.registerIndexListener(indexListener);
}
@Override
public MessageStoreStatistics getMessageStoreStatistics() {
return delegate.getMessageStoreStatistics();
}
}

View File

@ -35,8 +35,8 @@ import org.apache.activemq.store.AbstractMessageStore;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which
* uses a
*
*
*
*
*/
public class MemoryMessageStore extends AbstractMessageStore {
@ -56,6 +56,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
synchronized (messageTable) {
messageTable.put(message.getMessageId(), message);
getMessageStoreStatistics().getMessageCount().increment();
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
}
message.incrementReferenceCount();
message.getMessageId().setFutureOrSequenceLong(sequenceId++);
@ -93,6 +95,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
lastBatchId = null;
}
getMessageStoreStatistics().getMessageCount().decrement();
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
}
}
@ -114,20 +118,17 @@ public class MemoryMessageStore extends AbstractMessageStore {
public void removeAllMessages(ConnectionContext context) throws IOException {
synchronized (messageTable) {
messageTable.clear();
getMessageStoreStatistics().reset();
}
}
public void delete() {
synchronized (messageTable) {
messageTable.clear();
getMessageStoreStatistics().reset();
}
}
public int getMessageCount() {
return messageTable.size();
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
@ -161,8 +162,34 @@ public class MemoryMessageStore extends AbstractMessageStore {
public void updateMessage(Message message) {
synchronized (messageTable) {
Message original = messageTable.get(message.getMessageId());
//if can't be found then increment count, else remove old size
if (original == null) {
getMessageStoreStatistics().getMessageCount().increment();
} else {
getMessageStoreStatistics().getMessageSize().addSize(-original.getSize());
}
messageTable.put(message.getMessageId(), message);
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
}
}
@Override
public void recoverMessageStoreStatistics() throws IOException {
synchronized (messageTable) {
long size = 0;
int count = 0;
for (Iterator<Message> iter = messageTable.values().iterator(); iter
.hasNext();) {
Message msg = iter.next();
size += msg.getSize();
}
getMessageStoreStatistics().reset();
getMessageStoreStatistics().getMessageCount().setCount(count);
getMessageStoreStatistics().getMessageSize().setTotalSize(size);
}
}
}

View File

@ -67,6 +67,23 @@ public class SizeStatisticImpl extends StatisticImpl {
}
}
/**
* Reset the total size to the new value
*
* @param size
*/
public synchronized void setTotalSize(long size) {
count++;
totalSize = size;
if (size > maxSize) {
maxSize = size;
}
if (size < minSize || minSize == 0) {
minSize = size;
}
updateSampleTime();
}
/**
* @return the maximum size of any step
*/

View File

@ -304,6 +304,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
@Override
public int getMessageCount() throws IOException {
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
@ -401,4 +402,5 @@ public class JDBCMessageStore extends AbstractMessageStore {
public String toString() {
return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
}
}

View File

@ -48,8 +48,8 @@ import org.slf4j.LoggerFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
*
*
*
*/
public class JournalMessageStore extends AbstractMessageStore {
@ -79,7 +79,7 @@ public class JournalMessageStore extends AbstractMessageStore {
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
this.memoryUsage=memoryUsage;
longTermStore.setMemoryUsage(memoryUsage);
@ -323,7 +323,7 @@ public class JournalMessageStore extends AbstractMessageStore {
}
/**
*
*
*/
public Message getMessage(MessageId identity) throws IOException {
Message answer = null;
@ -348,7 +348,7 @@ public class JournalMessageStore extends AbstractMessageStore {
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
*
* @param listener
* @throws Exception
*/
@ -404,6 +404,11 @@ public class JournalMessageStore extends AbstractMessageStore {
return longTermStore.getMessageCount();
}
public long getMessageSize() throws IOException {
peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageSize();
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(maxReturned, listener);

View File

@ -61,6 +61,7 @@ import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer;
@ -503,34 +504,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return loadMessage(location);
}
@Override
public int getMessageCount() throws IOException {
try {
lockAsyncJobQueue();
indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
@Override
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count
// of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
iterator.next();
rc++;
}
return rc;
}
});
} finally {
indexLock.writeLock().unlock();
}
} finally {
unlockAsyncJobQueue();
}
}
@Override
public boolean isEmpty() throws IOException {
indexLock.writeLock().lock();
@ -716,6 +689,38 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public String toString(){
return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
}
@Override
protected void recoverMessageStoreStatistics() throws IOException {
try {
MessageStoreStatistics recoveredStatistics;
lockAsyncJobQueue();
indexLock.writeLock().lock();
try {
recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
@Override
public MessageStoreStatistics execute(Transaction tx) throws IOException {
MessageStoreStatistics statistics = new MessageStoreStatistics();
// Iterate through all index entries to get the size of each message
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
int locationSize = iterator.next().getKey().getSize();
statistics.getMessageCount().increment();
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
}
return statistics;
}
});
getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
} finally {
indexLock.writeLock().unlock();
}
} finally {
unlockAsyncJobQueue();
}
}
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
@ -993,12 +998,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
return this.transactionStore.proxy(new KahaDBMessageStore(destination));
MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination));
storeCache.put(key(convert(destination)), store);
return store;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
storeCache.put(key(convert(destination)), store);
return store;
}
/**

View File

@ -46,6 +46,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -53,10 +54,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
@ -113,7 +119,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
static final int VERSION = 5;
static final int VERSION = 6;
protected class Metadata {
protected Page<Metadata> page;
@ -738,7 +744,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long undoCounter=0;
// Go through all the destinations to see if they have messages past the lastAppendLocation
for (StoredDestination sd : storedDestinations.values()) {
for (String key : storedDestinations.keySet()) {
StoredDestination sd = storedDestinations.get(key);
final ArrayList<Long> matches = new ArrayList<Long>();
// Find all the Locations that are >= than the last Append Location.
@ -755,6 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
metadata.producerSequenceIdTracker.rollback(keys.messageId);
undoCounter++;
decrementAndSubSizeToStoreStat(key, keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
}
@ -858,6 +866,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
undoCounter++;
decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
} else {
@ -1312,6 +1321,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (previous == null) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if (previous == null) {
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
addAckLocationForNewMessage(tx, sd, id);
@ -1337,7 +1347,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// record this id in any event, initial send or recovery
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
return id;
return id;
}
void trackPendingAdd(KahaDestination destination, Long seq) {
@ -1367,9 +1378,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
new MessageKeys(command.getMessageId(), location)
);
sd.locationIndex.put(tx, location, id);
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
// on first update previous is original location, on recovery/replay it may be the updated location
if(previousKeys != null && !previousKeys.location.equals(location)) {
sd.locationIndex.remove(tx, previousKeys.location);
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
}
metadata.lastUpdate = location;
} else {
@ -1387,6 +1400,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
recordAckMessageReferenceLocation(ackLocation, keys.location);
metadata.lastUpdate = ackLocation;
} else if (LOG.isDebugEnabled()) {
@ -1414,7 +1428,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
recordAckMessageReferenceLocation(ackLocation, keys.location);
}
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, sequence);
removeAckLocation(command, tx, sd, subscriptionKey, sequence);
metadata.lastUpdate = ackLocation;
} else if (LOG.isDebugEnabled()) {
LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
@ -1470,6 +1484,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
String key = key(command.getDestination());
storedDestinations.remove(key);
metadata.destinations.remove(tx, key);
clearStoreStats(command.getDestination());
storeCache.remove(key(command.getDestination()));
}
void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
@ -1494,13 +1510,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.subLocations.remove(tx, subscriptionKey);
sd.subscriptionAcks.remove(tx, subscriptionKey);
sd.subscriptionCache.remove(subscriptionKey);
removeAckLocationsForSub(tx, sd, subscriptionKey);
removeAckLocationsForSub(command, tx, sd, subscriptionKey);
if (sd.subscriptions.isEmpty(tx)) {
// remove the stored destination
KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
removeDestinationCommand.setDestination(command.getDestination());
updateIndex(tx, removeDestinationCommand, null);
clearStoreStats(command.getDestination());
}
}
}
@ -1879,6 +1896,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
class StoredDestination {
MessageOrderIndex orderIndex = new MessageOrderIndex();
@ -1912,6 +1930,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
@Override
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
final StoredDestination value = new StoredDestination();
@ -1996,12 +2016,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void execute(Transaction tx) throws IOException {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
value.orderIndex.lowPriorityIndex.load(tx);
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
value.orderIndex.highPriorityIndex.load(tx);
}
});
@ -2100,7 +2120,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Figure out the next key using the last entry in the destination.
rc.orderIndex.configureLast(tx);
rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
rc.locationIndex.load(tx);
@ -2202,6 +2222,133 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return rc;
}
/**
* Clear the counter for the destination, if one exists.
*
* @param kahaDestination
*/
protected void clearStoreStats(KahaDestination kahaDestination) {
MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination));
if (storeStats != null) {
storeStats.reset();
}
}
/**
* Update MessageStoreStatistics
*
* @param kahaDestination
* @param size
*/
protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
incrementAndAddSizeToStoreStat(key(kahaDestination), size);
}
protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
if (storeStats != null) {
storeStats.getMessageCount().increment();
if (size > 0) {
storeStats.getMessageSize().addSize(size);
}
}
}
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
decrementAndSubSizeToStoreStat(key(kahaDestination), size);
}
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
if (storeStats != null) {
storeStats.getMessageCount().decrement();
if (size > 0) {
storeStats.getMessageSize().addSize(-size);
}
}
}
/**
* This is a map to cache DestinationStatistics for a specific
* KahaDestination key
*/
protected final Map<String, MessageStore> storeCache =
new ConcurrentHashMap<String, MessageStore>();
/**
* Locate the storeMessageSize counter for this KahaDestination
* @param kahaDestination
* @return
*/
protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
MessageStoreStatistics storeStats = null;
try {
MessageStore messageStore = storeCache.get(kahaDestKey);
if (messageStore != null) {
storeStats = messageStore.getMessageStoreStatistics();
}
} catch (Exception e1) {
LOG.error("Getting size counter of destination failed", e1);
}
return storeStats;
}
/**
* Determine whether this Destination matches the DestinationType
*
* @param destination
* @param type
* @return
*/
protected boolean matchType(Destination destination,
KahaDestination.DestinationType type) {
if (destination instanceof Topic
&& type.equals(KahaDestination.DestinationType.TOPIC)) {
return true;
} else if (destination instanceof Queue
&& type.equals(KahaDestination.DestinationType.QUEUE)) {
return true;
}
return false;
}
class LocationSizeMarshaller implements Marshaller<Location> {
public LocationSizeMarshaller() {
}
public Location readPayload(DataInput dataIn) throws IOException {
Location rc = new Location();
rc.setDataFileId(dataIn.readInt());
rc.setOffset(dataIn.readInt());
if (metadata.version >= 6) {
rc.setSize(dataIn.readInt());
}
return rc;
}
public void writePayload(Location object, DataOutput dataOut)
throws IOException {
dataOut.writeInt(object.getDataFileId());
dataOut.writeInt(object.getOffset());
dataOut.writeInt(object.getSize());
}
public int getFixedSize() {
return 12;
}
public Location deepCopy(Location source) {
return new Location(source);
}
public boolean isDeepCopySupported() {
return true;
}
}
private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
if (sequences == null) {
@ -2269,7 +2416,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
private void removeAckLocationsForSub(KahaSubscriptionCommand command,
Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
if (!sd.ackPositions.isEmpty(tx)) {
SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
if (sequences == null || sequences.isEmpty()) {
@ -2302,6 +2450,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
}
}
}
@ -2314,7 +2463,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* @param messageSequence
* @throws IOException
*/
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
private void removeAckLocation(KahaRemoveMessageCommand command,
Transaction tx, StoredDestination sd, String subscriptionKey,
Long messageSequence) throws IOException {
// Remove the sub from the previous location set..
if (messageSequence != null) {
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
@ -2347,6 +2498,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
}
}
}

View File

@ -197,25 +197,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return msg;
}
@Override
public int getMessageCount() throws IOException {
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
@Override
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc=0;
for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
iterator.next();
rc++;
}
return rc;
}
});
}
}
@Override
public void recover(final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
@ -297,6 +278,27 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
public void stop() throws Exception {
}
@Override
public void recoverMessageStoreStatistics() throws IOException {
int count = 0;
synchronized(indexMutex) {
count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
@Override
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc=0;
for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
iterator.next();
rc++;
}
return rc;
}
});
}
getMessageStoreStatistics().getMessageCount().setCount(count);
}
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {

View File

@ -22,8 +22,13 @@ import java.io.IOException;
import org.apache.activemq.store.kahadb.disk.journal.Location;
public class LocationMarshaller implements Marshaller<Location> {
public final static LocationMarshaller INSTANCE = new LocationMarshaller();
public LocationMarshaller () {
}
public Location readPayload(DataInput dataIn) throws IOException {
Location rc = new Location();
rc.setDataFileId(dataIn.readInt());

View File

@ -834,7 +834,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
cursorPosition = cursorResetPosition
}
def getMessageCount: Int = {
override def getMessageCount: Int = {
return db.collectionSize(key).toInt
}

View File

@ -490,11 +490,6 @@ public class StoreQueueCursorOrderTest {
}
@Override
public int getMessageCount() throws IOException {
return 0;
}
@Override
public void resetBatching() {
@ -513,5 +508,10 @@ public class StoreQueueCursorOrderTest {
batch.incrementAndGet();
}
@Override
public void recoverMessageStoreStatistics() throws IOException {
this.getMessageStoreStatistics().reset();
}
}
}

View File

@ -0,0 +1,266 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that KahaDB properly sets the new storeMessageSize statistic.
*
* AMQ-5748
*
*/
public abstract class AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory
.getLogger(AbstractMessageStoreSizeStatTest.class);
protected BrokerService broker;
protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
protected static int messageSize = 1000;
@Before
public void startBroker() throws Exception {
setUpBroker(true);
}
protected void setUpBroker(boolean clearDataDir) throws Exception {
broker = new BrokerService();
this.initPersistence(broker);
//set up a transport
TransportConnector connector = broker
.addConnector(new TransportConnector());
connector.setUri(new URI("tcp://0.0.0.0:0"));
connector.setName("tcp");
broker.start();
broker.waitUntilStarted();
brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
protected abstract void initPersistence(BrokerService brokerService) throws IOException;
@Test
public void testMessageSize() throws Exception {
Destination dest = publishTestMessages(200);
verifyStats(dest, 200, 200 * messageSize);
}
@Test
public void testMessageSizeAfterConsumption() throws Exception {
Destination dest = publishTestMessages(200);
verifyStats(dest, 200, 200 * messageSize);
consumeTestMessages();
Thread.sleep(3000);
verifyStats(dest, 0, 0);
}
@Test
public void testMessageSizeDurable() throws Exception {
Destination dest = publishTestMessagesDurable();
//verify the count and size
verifyStats(dest, 200, 200 * messageSize);
}
@Test
public void testMessageSizeAfterDestinationDeletion() throws Exception {
Destination dest = publishTestMessages(200);
verifyStats(dest, 200, 200 * messageSize);
//check that the size is 0 after deletion
broker.removeDestination(dest.getActiveMQDestination());
verifyStats(dest, 0, 0);
}
protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception {
MessageStore messageStore = dest.getMessageStore();
MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
assertEquals(messageStore.getMessageCount(), count);
assertEquals(messageStore.getMessageCount(),
storeStats.getMessageCount().getCount());
assertEquals(messageStore.getMessageSize(),
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
if (count > 0) {
assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
} else {
assertEquals(storeStats.getMessageSize().getTotalSize(), 0);
}
}
/**
* Generate random 1 megabyte messages
* @param session
* @return
* @throws JMSException
*/
protected BytesMessage createMessage(Session session) throws JMSException {
final BytesMessage message = session.createBytesMessage();
final byte[] data = new byte[messageSize];
final Random rng = new Random();
rng.nextBytes(data);
message.writeBytes(data);
return message;
}
protected Destination publishTestMessages(int count) throws Exception {
return publishTestMessages(count, defaultQueueName);
}
protected Destination publishTestMessages(int count, String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
Destination dest = broker.getDestination(activeMqQueue);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
.createConnection();
connection.setClientID("clientId" + queueName);
connection.start();
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(queue);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < count; i++) {
prod.send(createMessage(session));
}
} finally {
connection.stop();
}
return dest;
}
protected Destination consumeTestMessages() throws Exception {
return consumeTestMessages(defaultQueueName);
}
protected Destination consumeTestMessages(String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
Destination dest = broker.getDestination(activeMqQueue);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 200; i++) {
consumer.receive();
}
} finally {
connection.stop();
}
return dest;
}
protected Destination publishTestMessagesDurable() throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
"test.topic");
Destination dest = broker.getDestination(activeMqTopic);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
session.createDurableSubscriber(topic, "sub1");
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 200; i++) {
prod.send(createMessage(session));
}
} finally {
connection.stop();
}
return dest;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import static org.junit.Assert.assertTrue;
import java.util.Random;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
* compute the size of the messages in the store.
*
*/
public abstract class AbstractMessageStoreSizeTest {
protected static final IdGenerator id = new IdGenerator();
protected ActiveMQQueue destination = new ActiveMQQueue("Test");
protected ProducerId producerId = new ProducerId("1.1.1");
protected static final int MESSAGE_COUNT = 20;
protected static String dataDirectory = "target/test-amq-5748/datadb";
protected static int testMessageSize = 1000;
@Before
public void init() throws Exception {
this.initStore();
}
@After
public void destroy() throws Exception {
this.destroyStore();
}
protected abstract void initStore() throws Exception;
protected abstract void destroyStore() throws Exception;
/**
* This method tests that the message size exists after writing a bunch of messages to the store.
* @throws Exception
*/
@Test
public void testMessageSize() throws Exception {
writeMessages();
long messageSize = getMessageStore().getMessageSize();
assertTrue(getMessageStore().getMessageCount() == 20);
assertTrue(messageSize > 20 * testMessageSize);
}
/**
* Write random byte messages to the store for testing.
*
* @throws Exception
*/
protected void writeMessages() throws Exception {
final ConnectionContext context = new ConnectionContext();
for (int i = 0; i < MESSAGE_COUNT; i++) {
ActiveMQMessage message = new ActiveMQMessage();
final byte[] data = new byte[testMessageSize];
final Random rng = new Random();
rng.nextBytes(data);
message.setContent(new ByteSequence(data));
message.setDestination(destination);
message.setMessageId(new MessageId(id.generateId() + ":1", i));
getMessageStore().addMessage(context, message);
}
}
protected abstract MessageStore getMessageStore();
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import org.apache.activemq.store.AbstractMessageStoreSizeTest;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.junit.Test;
/**
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
* compute the size of the messages in the store.
*
* For KahaDB specifically, the size was not being stored in in the index ({@link LocationMarshaller}). LocationMarshaller
* has been updated to include an option to include the size in the serialized value. This way the message
* size will be persisted in the index and be available between broker restarts without needing to rebuild the index.
* Note that the KahaDB version has been incremented from 5 to 6 because the index will need to be rebuild when a version
* 5 index is detected since it will be detected as corrupt.
*
*/
public abstract class AbstractKahaDBMessageStoreSizeTest extends AbstractMessageStoreSizeTest {
MessageStore messageStore;
PersistenceAdapter store;
@Override
public void initStore() throws Exception {
createStore(true, dataDirectory);
}
abstract protected void createStore(boolean deleteAllMessages, String directory) throws Exception;
abstract protected String getVersion5Dir();
@Override
public void destroyStore() throws Exception {
if (store != null) {
store.stop();
}
}
/**
* This method tests that the message sizes exist for all messages that exist after messages are recovered
* off of disk.
*
* @throws Exception
*/
@Test
public void testMessageSizeStoreRecovery() throws Exception {
writeMessages();
store.stop();
createStore(false, dataDirectory);
writeMessages();
long messageSize = messageStore.getMessageSize();
assertEquals(40, messageStore.getMessageCount());
assertTrue(messageSize > 40 * testMessageSize);
}
/**
* This method tests that a version 5 store with an old index still works but returns 0 for messgage sizes.
*
* @throws Exception
*/
@Test
public void testMessageSizeStoreRecoveryVersion5() throws Exception {
store.stop();
//Copy over an existing version 5 store with messages
File dataDir = new File(dataDirectory);
if (dataDir.exists())
FileUtils.deleteDirectory(new File(dataDirectory));
FileUtils.copyDirectory(new File(getVersion5Dir()),
dataDir);
//reload store
createStore(false, dataDirectory);
//make sure size is 0
long messageSize = messageStore.getMessageSize();
assertTrue(messageStore.getMessageCount() == 20);
assertTrue(messageSize == 0);
}
/**
* This method tests that a version 5 store with existing messages will correctly be recovered and converted
* to version 6. After index deletion, the index will be rebuilt and will include message sizes.
*
* @throws Exception
*/
@Test
public void testMessageSizeStoreRecoveryVersion5RebuildIndex() throws Exception {
store.stop();
//Copy over an existing version 5 store with messages
File dataDir = new File(dataDirectory);
if (dataDir.exists())
FileUtils.deleteDirectory(new File(dataDirectory));
FileUtils.copyDirectory(new File(getVersion5Dir()),
dataDir);
for (File index : FileUtils.listFiles(new File(dataDirectory), new WildcardFileFilter("*.data"), TrueFileFilter.INSTANCE)) {
FileUtils.deleteQuietly(index);
}
//append more messages...at this point the index should be rebuilt
createStore(false, dataDirectory);
writeMessages();
//after writing new messages to the existing store, make sure the index is rebuilt and size is correct
long messageSize = messageStore.getMessageSize();
assertTrue(messageStore.getMessageCount() == 40);
assertTrue(messageSize > 40 * testMessageSize);
}
@Override
protected MessageStore getMessageStore() {
return messageStore;
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that KahaDB properly sets the new storeMessageSize
* statistic.
*
* AMQ-5748
*
*/
public class KahaDBMessageStoreSizeStatTest extends
AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory
.getLogger(KahaDBMessageStoreSizeStatTest.class);
File dataFileDir = new File("target/test-amq-5748/stat-datadb");
@Override
protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.exists())
FileUtils.cleanDirectory(dataFileDir);
super.setUpBroker(clearDataDir);
}
@Override
protected void initPersistence(BrokerService brokerService)
throws IOException {
broker.setPersistent(true);
broker.setDataDirectoryFile(dataFileDir);
}
/**
* Test that the the counter restores size and works after restart and more
* messages are published
*
* @throws Exception
*/
@Test
public void testMessageSizeAfterRestartAndPublish() throws Exception {
Destination dest = publishTestMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestMessages(200);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import org.apache.activemq.store.MessageStore;
/**
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
* compute the size of the messages in the KahaDB Store.
*
*/
public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest {
@Override
protected void createStore(boolean deleteAllMessages, String directory) throws Exception {
KahaDBStore kahaDBStore = new KahaDBStore();
store = kahaDBStore;
kahaDBStore.setJournalMaxFileLength(1024 * 512);
kahaDBStore.setDeleteAllMessages(deleteAllMessages);
kahaDBStore.setDirectory(new File(directory));
kahaDBStore.start();
messageStore = store.createQueueMessageStore(destination);
messageStore.start();
}
@Override
protected String getVersion5Dir() {
return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that KahaDB properly sets the new storeMessageSize
* statistic.
*
* AMQ-5748
*
*/
public class MultiKahaDBMessageStoreSizeStatTest extends
AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory
.getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
File dataFileDir = new File("target/test-amq-5748/stat-datadb");
@Override
protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.exists())
FileUtils.cleanDirectory(dataFileDir);
super.setUpBroker(clearDataDir);
}
@Override
protected void initPersistence(BrokerService brokerService)
throws IOException {
broker.setPersistent(true);
//setup multi-kaha adapter
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(dataFileDir);
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setJournalMaxFileLength(1024 * 512);
//set up a store per destination
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
filtered.setPersistenceAdapter(kahaStore);
filtered.setPerDestination(true);
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
stores.add(filtered);
persistenceAdapter.setFilteredPersistenceAdapters(stores);
broker.setPersistenceAdapter(persistenceAdapter);
}
/**
* Test that the the counter restores size and works after restart and more
* messages are published
*
* @throws Exception
*/
@Test
public void testMessageSizeAfterRestartAndPublish() throws Exception {
Destination dest = publishTestMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestMessages(200);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);
}
@Test
public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
Destination dest = publishTestMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
Destination dest2 = publishTestMessages(200, "test.queue2");
// verify the count and size
verifyStats(dest2, 200, 200 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize);
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestMessages(200);
dest2 = publishTestMessages(200, "test.queue2");
// verify the count and size after publishing messages
verifyStats(dest, 400, 400 * messageSize);
verifyStats(dest2, 400, 400 * messageSize);
System.out.println(broker.getPersistenceAdapter().size());
assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() >=
(dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize()));
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.store.MessageStore;
import org.apache.commons.io.FileUtils;
/**
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
* compute the size of the messages in the store.
*
*
*/
public class MultiKahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest {
@Override
protected void createStore(boolean deleteAllMessages, String directory) throws Exception {
MultiKahaDBPersistenceAdapter multiStore = new MultiKahaDBPersistenceAdapter();
store = multiStore;
File fileDir = new File(directory);
if (deleteAllMessages && fileDir.exists()) {
FileUtils.cleanDirectory(new File(directory));
}
KahaDBPersistenceAdapter localStore = new KahaDBPersistenceAdapter();
localStore.setJournalMaxFileLength(1024 * 512);
localStore.setDirectory(new File(directory));
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
filtered.setPersistenceAdapter(localStore);
filtered.setPerDestination(true);
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
stores.add(filtered);
multiStore.setFilteredPersistenceAdapters(stores);
multiStore.setDirectory(fileDir);
multiStore.start();
messageStore = store.createQueueMessageStore(destination);
messageStore.start();
}
@Override
protected String getVersion5Dir() {
return "src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5";
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that KahaDB properly sets the new storeMessageSize statistic.
*
* AMQ-5748
*
*/
public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory
.getLogger(MemoryMessageStoreSizeStatTest.class);
@Override
protected void initPersistence(BrokerService brokerService) throws IOException {
broker.setPersistent(false);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import org.apache.activemq.store.AbstractMessageStoreSizeTest;
import org.apache.activemq.store.MessageStore;
public class MemoryMessageStoreSizeTest extends AbstractMessageStoreSizeTest {
MemoryMessageStore messageStore;
@Override
public void initStore() throws Exception {
messageStore = new MemoryMessageStore(destination);
messageStore.start();
}
@Override
public void destroyStore() throws Exception {
if (messageStore != null) {
messageStore.stop();
}
}
@Override
protected MessageStore getMessageStore() {
return messageStore;
}
}