Adding metrics to track the pending message size for a queue and for
subscribers.  This is useful so that not only the pending count is
known but also the total message size left to consume. Also improving
the message size store tests as well.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-08-21 18:58:08 +00:00
parent b17cc37ef9
commit 734fb7dda3
43 changed files with 1924 additions and 331 deletions

View File

@ -581,6 +581,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
return pending.size();
}
@Override
public long getPendingMessageSize() {
synchronized (pendingLock) {
return pending.messageSize();
}
}
@Override
public int getDispatchedQueueSize() {
return dispatched.size();

View File

@ -927,6 +927,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
return msg;
}
public long getPendingMessageSize() {
messagesLock.readLock().lock();
try{
return messages.messageSize();
} finally {
messagesLock.readLock().unlock();
}
}
public long getPendingMessageCount() {
return this.destinationStatistics.getMessages().getCount();
}
@Override
public String toString() {
return destination.getQualifiedName() + ", subscriptions=" + consumers.size()

View File

@ -118,6 +118,11 @@ public interface Subscription extends SubscriptionRecovery {
*/
int getPendingQueueSize();
/**
* @return size of the messages pending delivery
*/
long getPendingMessageSize();
/**
* @return number of messages dispatched to the client
*/

View File

@ -418,6 +418,13 @@ public class TopicSubscription extends AbstractSubscription {
return matched();
}
@Override
public long getPendingMessageSize() {
synchronized (matchedListMutex) {
return matched.messageSize();
}
}
@Override
public int getDispatchedQueueSize() {
return (int)(getSubscriptionStatistics().getDispatched().getCount() -

View File

@ -23,6 +23,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@ -49,6 +50,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
final MessageId[] lastCachedIds = new MessageId[2];
protected boolean hadSpace = false;
protected AbstractStoreCursor(Destination destination) {
super((destination != null ? destination.isPrioritizedMessages():false));
this.regionDestination=destination;
@ -60,6 +63,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public final synchronized void start() throws Exception{
if (!isStarted()) {
super.start();
@ -78,6 +82,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetSize();
}
@Override
public final synchronized void stop() throws Exception {
resetBatch();
super.stop();
@ -85,6 +90,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public final boolean recoverMessage(Message message) throws Exception {
return recoverMessage(message,false);
}
@ -136,6 +142,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
duplicatesFromStore.clear();
}
@Override
public final synchronized void reset() {
if (batchList.isEmpty()) {
try {
@ -150,6 +157,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public synchronized void release() {
clearIterator(false);
}
@ -173,6 +181,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public final synchronized boolean hasNext() {
if (batchList.isEmpty()) {
try {
@ -187,6 +196,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public final synchronized MessageReference next() {
MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@ -199,6 +209,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return result;
}
@Override
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
boolean disableCache = false;
if (hasSpace()) {
@ -333,12 +344,14 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public synchronized void addMessageFirst(MessageReference node) throws Exception {
setCacheEnabled(false);
size++;
}
@Override
public final synchronized void remove() {
size--;
if (iterator!=null) {
@ -350,6 +363,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public final synchronized void remove(MessageReference node) {
if (batchList.remove(node) != null) {
size--;
@ -358,11 +372,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public final synchronized void clear() {
gc();
}
@Override
public synchronized void gc() {
for (MessageReference msg : batchList) {
rollback(msg.getMessageId());
@ -374,6 +390,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
setCacheEnabled(false);
}
@Override
protected final synchronized void fillBatch() {
if (LOG.isTraceEnabled()) {
LOG.trace("{} fillBatch", this);
@ -395,17 +412,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
@Override
public final synchronized boolean isEmpty() {
// negative means more messages added to store through queue.send since last reset
return size == 0;
}
@Override
public final synchronized boolean hasMessagesBufferedToDeliver() {
return !batchList.isEmpty();
}
@Override
public final synchronized int size() {
if (size < 0) {
this.size = getStoreSize();
@ -413,6 +433,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return size;
}
@Override
public final synchronized long messageSize() {
return getStoreMessageSize();
}
@Override
public String toString() {
return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
@ -428,6 +453,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected abstract int getStoreSize();
protected abstract long getStoreMessageSize();
protected abstract boolean isStoreEmpty();
public Subscription getSubscription() {

View File

@ -44,8 +44,8 @@ import org.apache.activemq.util.ByteSequence;
/**
* persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
*
*
*
*
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
@ -198,15 +198,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/**
* add message to await dispatch
*
*
* @param node
* @throws Exception
* @throws Exception
*/
@Override
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
return tryAddMessageLast(node, 0);
}
@Override
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
if (!node.isExpired()) {
@ -252,7 +252,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/**
* add message to await dispatch
*
*
* @param node
*/
@Override
@ -356,6 +356,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
}
@Override
public synchronized long messageSize() {
return memoryList.messageSize() + (isDiskListEmpty() ? 0 : (int)getDiskList().messageSize());
}
/**
* clear all pending messages
*/
@ -389,6 +394,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
super.setSystemUsage(usageManager);
}
@Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) {
@ -497,10 +503,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public MessageReference next() {
try {
PListEntry entry = iterator.next();
@ -513,6 +521,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
}
@Override
public void remove() {
iterator.remove();
}

View File

@ -25,13 +25,23 @@ import java.util.Map;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.management.SizeStatisticImpl;
public class OrderedPendingList implements PendingList {
private PendingNode root = null;
private PendingNode tail = null;
private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
private final SizeStatisticImpl messageSize;
private final PendingMessageHelper pendingMessageHelper;
public OrderedPendingList() {
messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
messageSize.setEnabled(true);
pendingMessageHelper = new PendingMessageHelper(map, messageSize);
}
@Override
public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = new PendingNode(this, message);
if (root == null) {
@ -41,10 +51,11 @@ public class OrderedPendingList implements PendingList {
root.linkBefore(node);
root = node;
}
this.map.put(message.getMessageId(), node);
pendingMessageHelper.addToMap(message, node);
return node;
}
@Override
public PendingNode addMessageLast(MessageReference message) {
PendingNode node = new PendingNode(this, message);
if (root == null) {
@ -53,29 +64,35 @@ public class OrderedPendingList implements PendingList {
tail.linkAfter(node);
}
tail = node;
this.map.put(message.getMessageId(), node);
pendingMessageHelper.addToMap(message, node);
return node;
}
@Override
public void clear() {
this.root = null;
this.tail = null;
this.map.clear();
this.messageSize.reset();
}
@Override
public boolean isEmpty() {
return this.map.isEmpty();
}
@Override
public Iterator<MessageReference> iterator() {
return new Iterator<MessageReference>() {
private PendingNode current = null;
private PendingNode next = root;
@Override
public boolean hasNext() {
return next != null;
}
@Override
public MessageReference next() {
MessageReference result = null;
this.current = this.next;
@ -84,31 +101,39 @@ public class OrderedPendingList implements PendingList {
return result;
}
@Override
public void remove() {
if (this.current != null && this.current.getMessage() != null) {
map.remove(this.current.getMessage().getMessageId());
pendingMessageHelper.removeFromMap(this.current.getMessage());
}
removeNode(this.current);
}
};
}
@Override
public PendingNode remove(MessageReference message) {
PendingNode node = null;
if (message != null) {
node = this.map.remove(message.getMessageId());
node = pendingMessageHelper.removeFromMap(message);
removeNode(node);
}
return node;
}
@Override
public int size() {
return this.map.size();
}
@Override
public long messageSize() {
return this.messageSize.getTotalSize();
}
void removeNode(PendingNode node) {
if (node != null) {
map.remove(node.getMessage().getMessageId());
pendingMessageHelper.removeFromMap(node.getMessage());
if (root == node) {
root = (PendingNode) node.getNext();
}

View File

@ -73,6 +73,8 @@ public interface PendingList extends Iterable<MessageReference> {
*/
public int size();
public long messageSize();
/**
* Returns an iterator over the pending Messages. The subclass controls how
* the returned iterator actually traverses the list of pending messages allowing
@ -81,6 +83,7 @@ public interface PendingList extends Iterable<MessageReference> {
*
* @return an Iterator that returns MessageReferences contained in this list.
*/
@Override
public Iterator<MessageReference> iterator();
/**

View File

@ -30,14 +30,14 @@ import org.apache.activemq.usage.SystemUsage;
/**
* Interface to pending message (messages awaiting disptach to a consumer)
* cursor
*
*
*
*
*/
public interface PendingMessageCursor extends Service {
/**
* Add a destination
*
*
* @param context
* @param destination
* @throws Exception
@ -46,7 +46,7 @@ public interface PendingMessageCursor extends Service {
/**
* remove a destination
*
*
* @param context
* @param destination
* @throws Exception
@ -60,7 +60,7 @@ public interface PendingMessageCursor extends Service {
/**
* check if a Destination is Empty for this cursor
*
*
* @param destination
* @return true id the Destination is empty
*/
@ -79,7 +79,7 @@ public interface PendingMessageCursor extends Service {
/**
* add message to await dispatch
*
*
* @param node
* @return boolean true if successful, false if cursor traps a duplicate
* @throws IOException
@ -89,9 +89,9 @@ public interface PendingMessageCursor extends Service {
/**
* add message to await dispatch - if it can
*
*
* @param node
* @param maxWaitTime
* @param maxWaitTime
* @return true if successful
* @throws IOException
* @throws Exception
@ -100,7 +100,7 @@ public interface PendingMessageCursor extends Service {
/**
* add message to await dispatch
*
*
* @param node
* @throws Exception
*/
@ -108,7 +108,7 @@ public interface PendingMessageCursor extends Service {
/**
* Add a message recovered from a retroactive policy
*
*
* @param node
* @throws Exception
*/
@ -134,6 +134,8 @@ public interface PendingMessageCursor extends Service {
*/
int size();
long messageSize();
/**
* clear all pending messages
*/
@ -142,7 +144,7 @@ public interface PendingMessageCursor extends Service {
/**
* Informs the Broker if the subscription needs to intervention to recover
* it's state e.g. DurableTopicSubscriber may do
*
*
* @return true if recovery required
*/
boolean isRecoveryRequired();
@ -154,7 +156,7 @@ public interface PendingMessageCursor extends Service {
/**
* Set the max batch size
*
*
* @param maxBatchSize
*/
void setMaxBatchSize(int maxBatchSize);
@ -167,7 +169,7 @@ public interface PendingMessageCursor extends Service {
/**
* remove a node
*
*
* @param node
*/
void remove(MessageReference node);
@ -179,7 +181,7 @@ public interface PendingMessageCursor extends Service {
/**
* Set the UsageManager
*
*
* @param systemUsage
* @see org.apache.activemq.usage.SystemUsage
*/
@ -204,7 +206,7 @@ public interface PendingMessageCursor extends Service {
* @return true if the cursor is full
*/
boolean isFull();
/**
* @return true if the cursor has space to page messages into
*/
@ -217,41 +219,41 @@ public interface PendingMessageCursor extends Service {
/**
* destroy the cursor
*
*
* @throws Exception
*/
void destroy() throws Exception;
/**
* Page in a restricted number of messages and increment the reference count
*
*
* @param maxItems
* @return a list of paged in messages
*/
LinkedList<MessageReference> pageInList(int maxItems);
/**
* set the maximum number of producers to track at one time
* @param value
*/
void setMaxProducersToAudit(int value);
/**
* @return the maximum number of producers to audit
*/
int getMaxProducersToAudit();
/**
* Set the maximum depth of message ids to track
* @param depth
* @param depth
*/
void setMaxAuditDepth(int depth);
/**
* @return the audit depth
*/
int getMaxAuditDepth();
/**
* @return the enableAudit
*/
@ -260,37 +262,37 @@ public interface PendingMessageCursor extends Service {
* @param enableAudit the enableAudit to set
*/
public void setEnableAudit(boolean enableAudit);
/**
* @return true if the underlying state of this cursor
* @return true if the underlying state of this cursor
* disappears when the broker shuts down
*/
public boolean isTransient();
/**
* set the audit
* @param audit
*/
public void setMessageAudit(ActiveMQMessageAudit audit);
/**
* @return the audit - could be null
*/
public ActiveMQMessageAudit getMessageAudit();
/**
* use a cache to improve performance
* @param useCache
*/
public void setUseCache(boolean useCache);
/**
* @return true if a cache may be used
*/
public boolean isUseCache();
/**
* remove from auditing the message id
* @param id

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.Map;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.management.SizeStatisticImpl;
/**
*
*
*/
public class PendingMessageHelper {
private final Map<MessageId, PendingNode> map;
private final SizeStatisticImpl messageSize;
public PendingMessageHelper(Map<MessageId, PendingNode> map,
SizeStatisticImpl messageSize) {
super();
this.map = map;
this.messageSize = messageSize;
}
public void addToMap(MessageReference message, PendingNode node) {
PendingNode previous = this.map.put(message.getMessageId(), node);
if (previous != null) {
try {
messageSize.addSize(-previous.getMessage().getSize());
} catch (Exception e) {
//expected for NullMessageReference
}
}
try {
messageSize.addSize(message.getSize());
} catch (Exception e) {
//expected for NullMessageReference
}
}
public PendingNode removeFromMap(MessageReference message) {
PendingNode removed = this.map.remove(message.getMessageId());
if (removed != null) {
try {
messageSize.addSize(-removed.getMessage().getSize());
} catch (Exception e) {
//expected for NullMessageReference
}
}
return removed;
}
}

View File

@ -25,50 +25,64 @@ import java.util.Map;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.management.SizeStatisticImpl;
public class PrioritizedPendingList implements PendingList {
private static final Integer MAX_PRIORITY = 10;
private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
private final SizeStatisticImpl messageSize;
private final PendingMessageHelper pendingMessageHelper;
public PrioritizedPendingList() {
for (int i = 0; i < MAX_PRIORITY; i++) {
this.lists[i] = new OrderedPendingList();
}
messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
messageSize.setEnabled(true);
pendingMessageHelper = new PendingMessageHelper(map, messageSize);
}
@Override
public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = getList(message).addMessageFirst(message);
this.map.put(message.getMessageId(), node);
this.pendingMessageHelper.addToMap(message, node);
return node;
}
@Override
public PendingNode addMessageLast(MessageReference message) {
PendingNode node = getList(message).addMessageLast(message);
this.map.put(message.getMessageId(), node);
this.pendingMessageHelper.addToMap(message, node);
return node;
}
@Override
public void clear() {
for (int i = 0; i < MAX_PRIORITY; i++) {
this.lists[i].clear();
}
this.map.clear();
this.messageSize.reset();
}
@Override
public boolean isEmpty() {
return this.map.isEmpty();
}
@Override
public Iterator<MessageReference> iterator() {
return new PrioritizedPendingListIterator();
}
@Override
public PendingNode remove(MessageReference message) {
PendingNode node = null;
if (message != null) {
node = this.map.remove(message.getMessageId());
node = this.pendingMessageHelper.removeFromMap(message);
if (node != null) {
node.getList().removeNode(node);
}
@ -76,10 +90,16 @@ public class PrioritizedPendingList implements PendingList {
return node;
}
@Override
public int size() {
return this.map.size();
}
@Override
public long messageSize() {
return this.messageSize.getTotalSize();
}
@Override
public String toString() {
return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
@ -111,10 +131,12 @@ public class PrioritizedPendingList implements PendingList {
}
}
}
@Override
public boolean hasNext() {
return list.size() > index;
}
@Override
public MessageReference next() {
PendingNode node = list.get(this.index);
this.currentIndex = this.index;
@ -122,10 +144,11 @@ public class PrioritizedPendingList implements PendingList {
return node.getMessage();
}
@Override
public void remove() {
PendingNode node = list.get(this.currentIndex);
if (node != null) {
map.remove(node.getMessage().getMessageId());
pendingMessageHelper.removeFromMap(node.getMessage());
node.getList().removeNode(node);
}
}

View File

@ -96,6 +96,11 @@ public class QueueDispatchPendingList implements PendingList {
return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size();
}
@Override
public long messageSize() {
return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize();
}
@Override
public Iterator<MessageReference> iterator() {
return new Iterator<MessageReference>() {

View File

@ -32,14 +32,14 @@ import org.slf4j.LoggerFactory;
/**
* persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
*
*
*
*
*/
class QueueStorePrefetch extends AbstractStoreCursor {
private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
private final MessageStore store;
private final Broker broker;
/**
* Construct it
* @param queue
@ -51,6 +51,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
}
@Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
Message msg = this.store.getMessage(messageReference);
if (msg != null) {
@ -62,36 +63,46 @@ class QueueStorePrefetch extends AbstractStoreCursor {
}
}
@Override
protected synchronized int getStoreSize() {
try {
int result = this.store.getMessageCount();
return result;
} catch (IOException e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
}
@Override
protected synchronized long getStoreMessageSize() {
try {
return this.store.getMessageSize();
} catch (IOException e) {
LOG.error("Failed to get message size", e);
throw new RuntimeException(e);
}
}
@Override
protected synchronized boolean isStoreEmpty() {
try {
return this.store.isEmpty();
} catch (Exception e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
}
@Override
protected void resetBatch() {
this.store.resetBatching();
}
@Override
protected void setBatch(MessageId messageId) throws Exception {
if (LOG.isTraceEnabled()) {
@ -101,7 +112,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
batchResetNeeded = false;
}
@Override
protected void doFillBatch() throws Exception {
hadSpace = this.hasSpace();

View File

@ -302,6 +302,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
return pendingCount;
}
@Override
public synchronized long messageSize() {
long pendingSize=0;
for (PendingMessageCursor tsp : storePrefetches) {
pendingSize += tsp.messageSize();
}
return pendingSize;
}
@Override
public void setMaxBatchSize(int newMaxBatchSize) {
for (PendingMessageCursor storePrefetch : storePrefetches) {

View File

@ -51,6 +51,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
currentCursor = persistent;
}
@Override
public synchronized void start() throws Exception {
started = true;
super.start();
@ -73,6 +74,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount = persistent.size() + nonPersistent.size();
}
@Override
public synchronized void stop() throws Exception {
started = false;
if (nonPersistent != null) {
@ -87,6 +89,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount = 0;
}
@Override
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
boolean result = true;
if (node != null) {
@ -104,6 +107,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
return result;
}
@Override
public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
@ -119,10 +123,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
@Override
public synchronized void clear() {
pendingCount = 0;
}
@Override
public synchronized boolean hasNext() {
try {
getNextCursor();
@ -133,11 +139,13 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
return currentCursor != null ? currentCursor.hasNext() : false;
}
@Override
public synchronized MessageReference next() {
MessageReference result = currentCursor != null ? currentCursor.next() : null;
return result;
}
@Override
public synchronized void remove() {
if (currentCursor != null) {
currentCursor.remove();
@ -145,6 +153,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount--;
}
@Override
public synchronized void remove(MessageReference node) {
if (!node.isPersistent()) {
nonPersistent.remove(node);
@ -154,18 +163,21 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount--;
}
@Override
public synchronized void reset() {
nonPersistent.reset();
persistent.reset();
pendingCount = persistent.size() + nonPersistent.size();
}
@Override
public void release() {
nonPersistent.release();
persistent.release();
}
@Override
public synchronized int size() {
if (pendingCount < 0) {
pendingCount = persistent.size() + nonPersistent.size();
@ -173,6 +185,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
return pendingCount;
}
@Override
public synchronized long messageSize() {
return persistent.messageSize() + nonPersistent.messageSize();
}
@Override
public synchronized boolean isEmpty() {
// if negative, more messages arrived in store since last reset so non empty
return pendingCount == 0;
@ -185,6 +203,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
* @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
@Override
public boolean isRecoveryRequired() {
return false;
}
@ -203,6 +222,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
this.nonPersistent = nonPersistent;
}
@Override
public void setMaxBatchSize(int maxBatchSize) {
persistent.setMaxBatchSize(maxBatchSize);
if (nonPersistent != null) {
@ -212,6 +232,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
@Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
super.setMaxProducersToAudit(maxProducersToAudit);
if (persistent != null) {
@ -222,6 +243,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setMaxAuditDepth(int maxAuditDepth) {
super.setMaxAuditDepth(maxAuditDepth);
if (persistent != null) {
@ -232,6 +254,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
@Override
public void setEnableAudit(boolean enableAudit) {
super.setEnableAudit(enableAudit);
if (persistent != null) {
@ -266,6 +289,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
@Override
public synchronized void gc() {
if (persistent != null) {
persistent.gc();
@ -276,6 +300,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount = persistent.size() + nonPersistent.size();
}
@Override
public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
if (persistent != null) {

View File

@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
/**
* persist pendingCount messages pendingCount message (messages awaiting disptach
* to a consumer) cursor
*
*
*
*
*/
class TopicStorePrefetch extends AbstractStoreCursor {
private static final Logger LOG = LoggerFactory.getLogger(TopicStorePrefetch.class);
@ -59,14 +59,17 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.storeHasMessages=this.size > 0;
}
@Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
// shouldn't get called
throw new RuntimeException("Not supported");
}
@Override
public synchronized void addMessageFirst(MessageReference node) throws Exception {
batchList.addMessageFirst(node);
size++;
//this.messageSize.addSize(node.getMessage().getSize());
}
@Override
@ -88,7 +91,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
}
storeHasMessages = true;
}
return recovered;
return recovered;
}
@Override
@ -100,7 +103,18 @@ class TopicStorePrefetch extends AbstractStoreCursor {
throw new RuntimeException(e);
}
}
@Override
protected synchronized long getStoreMessageSize() {
try {
return store.getMessageSize(clientId, subscriberName);
} catch (Exception e) {
LOG.error("{} Failed to get the outstanding message count from the store", this, e);
throw new RuntimeException(e);
}
}
@Override
protected synchronized boolean isStoreEmpty() {
try {
@ -111,7 +125,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
}
}
@Override
protected void resetBatch() {
this.store.resetBatching(clientId, subscriberName);

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@ -28,13 +29,13 @@ import org.apache.activemq.broker.region.QueueMessageReference;
/**
* hold pending messages in a linked list (messages awaiting disptach to a
* consumer) cursor
*
*
*
*
*/
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
private final PendingList list;
private Iterator<MessageReference> iter;
public VMPendingMessageCursor(boolean prioritizedMessages) {
super(prioritizedMessages);
if (this.prioritizedMessages) {
@ -44,7 +45,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
}
}
@Override
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
@ -62,7 +64,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* @return true if there are no pending messages
*/
@Override
public synchronized boolean isEmpty() {
if (list.isEmpty()) {
return true;
@ -85,7 +88,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* reset the cursor
*/
@Override
public synchronized void reset() {
iter = list.iterator();
last = null;
@ -93,10 +97,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* add message to await dispatch
*
*
* @param node
*/
@Override
public synchronized boolean addMessageLast(MessageReference node) {
node.incrementReferenceCount();
list.addMessageLast(node);
@ -105,10 +110,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* add message to await dispatch
*
*
* @param node
*/
@Override
public synchronized void addMessageFirst(MessageReference node) {
node.incrementReferenceCount();
list.addMessageFirst(node);
@ -117,7 +123,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* @return true if there pending messages to dispatch
*/
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@ -125,7 +132,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* @return the next pending message
*/
@Override
public synchronized MessageReference next() {
last = iter.next();
if (last != null) {
@ -137,7 +145,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* remove the message at the cursor position
*/
@Override
public synchronized void remove() {
if (last != null) {
last.decrementReferenceCount();
@ -148,15 +157,22 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* @return the number of pending messages
*/
@Override
public synchronized int size() {
return list.size();
}
@Override
public synchronized long messageSize() {
return list.messageSize();
}
/**
* clear all pending messages
*/
@Override
public synchronized void clear() {
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
@ -165,7 +181,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
list.clear();
}
@Override
public synchronized void remove(MessageReference node) {
list.remove(node);
node.decrementReferenceCount();
@ -173,11 +190,12 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
/**
* Page in a restricted number of messages
*
*
* @param maxItems
* @return a list of paged in messages
*/
@Override
public LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList<MessageReference> result = new LinkedList<MessageReference>();
for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
@ -191,12 +209,14 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
return result;
}
@Override
public boolean isTransient() {
return true;
}
@Override
public void destroy() throws Exception {
super.destroy();
clear();

View File

@ -41,6 +41,8 @@ public interface PList {
long size();
long messageSize();
public interface PListIterator extends Iterator<PListEntry> {
void release();
}

View File

@ -209,6 +209,7 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
return delegate.isPrioritizedMessages();
}
@Override
public void updateMessage(Message message) throws IOException {
delegate.updateMessage(message);
}
@ -223,4 +224,13 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
return delegate.getMessageStoreStatistics();
}
/* (non-Javadoc)
* @see org.apache.activemq.store.TopicMessageStore#getMessageSize(java.lang.String, java.lang.String)
*/
@Override
public long getMessageSize(String clientId, String subscriberName)
throws IOException {
return delegate.getMessageSize(clientId, subscriberName);
}
}

View File

@ -102,6 +102,8 @@ public interface TopicMessageStore extends MessageStore {
*/
int getMessageCount(String clientId, String subscriberName) throws IOException;
long getMessageSize(String clientId, String subscriberName) throws IOException;
/**
* Finds the subscriber entry for the given consumer info
*

View File

@ -145,6 +145,16 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
return result;
}
@Override
public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException {
long result = 0;
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
if (sub != null) {
result = sub.messageSize();
}
return result;
}
@Override
public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));

View File

@ -26,8 +26,8 @@ import org.apache.activemq.store.MessageRecoveryListener;
/**
* A holder for a durable subscriber
*
*
*
*
*/
class MemoryTopicSub {
@ -58,9 +58,20 @@ class MemoryTopicSub {
return map.size();
}
synchronized long messageSize() {
long messageSize = 0;
for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext();) {
Entry<MessageId, Message> entry = iter.next();
messageSize += entry.getValue().getSize();
}
return messageSize;
}
synchronized void recoverSubscription(MessageRecoveryListener listener) throws Exception {
for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Entry)iter.next();
for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext();) {
Entry<MessageId, Message> entry = iter.next();
Object msg = entry.getValue();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
@ -76,8 +87,8 @@ class MemoryTopicSub {
// the message table is a synchronizedMap - so just have to synchronize
// here
int count = 0;
for (Iterator iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
Map.Entry entry = (Entry)iter.next();
for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
Entry<MessageId, Message> entry = iter.next();
if (pastLackBatch) {
count++;
Object msg = entry.getValue();

View File

@ -44,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
@ -57,7 +57,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
@Override
protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
return size() > SEQUENCE_ID_CACHE_SIZE;
}
};
@ -67,6 +68,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
super(persistenceAdapter, adapter, wireFormat, topic, audit);
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
if (ack != null && ack.isUnmatchedAck()) {
if (LOG.isTraceEnabled()) {
@ -110,16 +112,19 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
/**
* @throws Exception
*/
@Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
@Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
return listener.recoverMessage(msg);
}
@Override
public boolean recoverMessageReference(String reference) throws Exception {
return listener.recoverMessageReference(new MessageId(reference));
}
@ -149,16 +154,19 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
}
@Override
public String toString() {
return Arrays.deepToString(perPriority);
}
@Override
public Iterator<LastRecoveredEntry> iterator() {
return new PriorityIterator();
}
class PriorityIterator implements Iterator<LastRecoveredEntry> {
int current = 9;
@Override
public boolean hasNext() {
for (int i=current; i>=0; i--) {
if (perPriority[i].hasMessages()) {
@ -169,10 +177,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return false;
}
@Override
public LastRecoveredEntry next() {
return perPriority[current];
}
@Override
public void remove() {
throw new RuntimeException("not implemented");
}
@ -188,6 +198,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
this.priority = priority;
}
@Override
public String toString() {
return priority + "-" + stored + ":" + recovered;
}
@ -213,6 +224,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
this.maxMessages = maxMessages;
}
@Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (delegate.hasSpace() && recoveredCount < maxMessages) {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
@ -226,6 +238,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return false;
}
@Override
public boolean recoverMessageReference(String reference) throws Exception {
return delegate.recoverMessageReference(new MessageId(reference));
}
@ -244,6 +257,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
@Override
public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
throws Exception {
//Duration duration = new Duration("recoverNextMessages");
@ -253,7 +267,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
if (!subscriberLastRecoveredMap.containsKey(key)) {
subscriberLastRecoveredMap.put(key, new LastRecovered());
}
final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
try {
if (LOG.isTraceEnabled()) {
@ -293,6 +307,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
@Override
public void resetBatching(String clientId, String subscriptionName) {
String key = getSubscriptionKey(clientId, subscriptionName);
if (!pendingCompletion.contains(key)) {
@ -330,6 +345,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
@Override
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@ -347,6 +363,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
* @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
* String)
*/
@Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@ -359,6 +376,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
@Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@ -372,6 +390,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
@Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@ -384,6 +403,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
@Override
public int getMessageCount(String clientId, String subscriberName) throws IOException {
//Duration duration = new Duration("getMessageCount");
int result = 0;
@ -403,6 +423,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return result;
}
@Override
public long getMessageSize(String clientId, String subscriberName) throws IOException {
return 0;
}
protected String getSubscriptionKey(String clientId, String subscriberName) {
String result = clientId + ":";
result += subscriberName != null ? subscriberName : "NOT_SET";

View File

@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
*
*
*
*/
public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
@ -54,12 +54,14 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
this.longTermStore = checkpointStore;
}
@Override
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
throws Exception {
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverSubscription(clientId, subscriptionName, listener);
}
@Override
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
MessageRecoveryListener listener) throws Exception {
this.peristenceAdapter.checkpoint(true, true);
@ -67,21 +69,25 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
}
@Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return longTermStore.lookupSubscription(clientId, subscriptionName);
}
@Override
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
longTermStore.addSubscription(subscriptionInfo, retroactive);
}
@Override
public void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
}
/**
*/
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
final MessageId messageId, MessageAck originalAck) throws IOException {
final boolean debug = LOG.isDebugEnabled();
@ -111,6 +117,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
}
transactionStore.acknowledge(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
@ -121,6 +128,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
}
}
@Override
public void afterRollback() throws Exception {
if (debug) {
LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
@ -159,6 +167,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
}
}
@Override
public RecordLocation checkpoint() throws IOException {
final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
@ -170,6 +179,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
}
return super.checkpoint(new Callback() {
@Override
public void execute() throws Exception {
// Checkpoint the acknowledged messages.
@ -193,19 +203,29 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
return longTermStore;
}
@Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
longTermStore.deleteSubscription(clientId, subscriptionName);
}
@Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
@Override
public int getMessageCount(String clientId, String subscriberName) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId, subscriberName);
}
@Override
public long getMessageSize(String clientId, String subscriberName) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageSize(clientId, subscriberName);
}
@Override
public void resetBatching(String clientId, String subscriptionName) {
longTermStore.resetBatching(clientId, subscriptionName);
}

View File

@ -892,6 +892,30 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
@Override
public long getMessageSize(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
@Override
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
if (cursorPos == null) {
// The subscription might not exist.
return 0;
}
return (int) getStoredMessageSize(tx, sd, subscriptionKey);
}
});
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception {

View File

@ -2536,6 +2536,32 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return 0;
}
public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
long locationSize = 0;
if (messageSequences != null) {
Iterator<Long> sequences = messageSequences.iterator();
while (sequences.hasNext()) {
Long sequenceId = sequences.next();
//the last item is the next marker
if (!sequences.hasNext()) {
break;
}
Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
while (iterator.hasNext()) {
Entry<Location, Long> entry = iterator.next();
if (entry.getValue() == sequenceId - 1) {
locationSize += entry.getKey().getSize();
break;
}
}
}
}
return locationSize;
}
protected String key(KahaDestination destination) {
return destination.getType().getNumber() + ":" + destination.getName();
}

View File

@ -408,6 +408,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
@Override
public long getMessageSize(String clientId, String subscriptionName) throws IOException {
return 0;
}
@Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -59,6 +60,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
this(pageFile, page.getPageId());
}
@Override
synchronized public void load(Transaction tx) throws IOException {
if (loaded.compareAndSet(false, true)) {
LOG.debug("loading");
@ -81,15 +83,22 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
setTailPageId(getHeadPageId());
size.addAndGet(node.size(tx));
onLoad(node, tx);
while (node.getNext() != NOT_SET ) {
node = loadNode(tx, node.getNext());
size.addAndGet(node.size(tx));
onLoad(node, tx);
setTailPageId(node.getPageId());
}
}
}
}
protected void onLoad(ListNode<Key, Value> node, Transaction tx) {
}
@Override
synchronized public void unload(Transaction tx) {
if (loaded.compareAndSet(true, false)) {
}
@ -103,6 +112,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
return loadNode(tx, getTailPageId());
}
@Override
synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
assertLoaded();
@ -123,6 +133,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
private Map.Entry<Key, Value> lastGetEntryCache = null;
private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
synchronized public Value get(Transaction tx, Key key) throws IOException {
assertLoaded();
@ -144,6 +155,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
*
* @return the old value contained in the list if one exists or null.
*/
@Override
@SuppressWarnings({ "rawtypes" })
synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
@ -211,6 +223,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
return null;
}
@Override
@SuppressWarnings("rawtypes")
synchronized public Value remove(Transaction tx, Key key) throws IOException {
assertLoaded();
@ -252,15 +265,17 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
return null;
}
public void onRemove() {
public void onRemove(Entry<Key, Value> removed) {
size.decrementAndGet();
flushCache();
}
@Override
public boolean isTransient() {
return false;
}
@Override
synchronized public void clear(Transaction tx) throws IOException {
for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
ListNode<Key,Value>candidate = iterator.next();
@ -280,6 +295,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
return getHead(tx).isEmpty(tx);
}
@Override
synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
return getHead(tx).iterator(tx);
}
@ -346,6 +362,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
public Marshaller<Key> getKeyMarshaller() {
return keyMarshaller;
}
@Override
public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
this.keyMarshaller = keyMarshaller;
}
@ -353,6 +370,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
public Marshaller<Value> getValueMarshaller() {
return valueMarshaller;
}
@Override
public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
this.valueMarshaller = valueMarshaller;
}

View File

@ -66,14 +66,17 @@ public final class ListNode<Key, Value> {
this.value = value;
}
@Override
public Key getKey() {
return key;
}
@Override
public Value getValue() {
return value;
}
@Override
public Value setValue(Value value) {
Value oldValue = this.value;
this.value = value;
@ -98,10 +101,12 @@ public final class ListNode<Key, Value> {
index = current.getContainingList();
}
@Override
public boolean hasNext() {
return nextEntry != null;
}
@Override
public ListNode<Key, Value> next() {
ListNode<Key, Value> current = nextEntry;
if (current != null) {
@ -121,6 +126,7 @@ public final class ListNode<Key, Value> {
return current;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@ -171,6 +177,7 @@ public final class ListNode<Key, Value> {
return result;
}
@Override
public boolean hasNext() {
if (nextEntry == null) {
nextEntry = getFromNextNode();
@ -178,6 +185,7 @@ public final class ListNode<Key, Value> {
return nextEntry != null;
}
@Override
public Entry<Key, Value> next() {
if (nextEntry != null) {
entryToRemove = nextEntry;
@ -188,6 +196,7 @@ public final class ListNode<Key, Value> {
}
}
@Override
public void remove() {
if (entryToRemove == null) {
throw new IllegalStateException("can only remove once, call hasNext();next() again");
@ -228,7 +237,7 @@ public final class ListNode<Key, Value> {
currentNode = previousNode;
}
}
targetList.onRemove();
targetList.onRemove(entryToRemove);
if (toRemoveNode != null) {
tx.free(toRemoveNode.getPage());
@ -262,6 +271,7 @@ public final class ListNode<Key, Value> {
this.valueMarshaller = valueMarshaller;
}
@Override
public void writePayload(ListNode<Key, Value> node, DataOutput os) throws IOException {
os.writeLong(node.next);
short count = (short) node.entries.size(); // cast may truncate
@ -279,6 +289,7 @@ public final class ListNode<Key, Value> {
}
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public ListNode<Key, Value> readPayload(DataInput is) throws IOException {
ListNode<Key, Value> node = new ListNode<Key, Value>();

View File

@ -21,22 +21,22 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.management.SizeStatisticImpl;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListEntry;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
import org.apache.activemq.store.kahadb.disk.index.ListNode;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,6 +45,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final PListStoreImpl store;
private String name;
Object indexLock;
private final SizeStatisticImpl messageSize;
PListImpl(PListStoreImpl store) {
this.store = store;
@ -52,6 +53,9 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
setPageFile(store.getPageFile());
setKeyMarshaller(StringMarshaller.INSTANCE);
setValueMarshaller(LocationMarshaller.INSTANCE);
messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
messageSize.setEnabled(true);
}
public void setName(String name) {
@ -75,6 +79,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
public synchronized void destroy() throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
clear(tx);
unload(tx);
@ -100,6 +105,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
add(tx, id, location);
}
@ -113,6 +119,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
addFirst(tx, id, location);
}
@ -133,6 +140,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, id) != null);
}
@ -145,6 +153,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
if (iterator.hasNext()) {
@ -165,6 +174,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
ref.set(iterator.next());
@ -183,6 +193,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
ref.set(getFirst(tx));
}
@ -200,6 +211,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
ref.set(getLast(tx));
}
@ -270,6 +282,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
}
@Override
public void release() {
try {
tx.rollback();
@ -285,6 +298,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
synchronized (indexLock) {
if (loaded.get()) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
while (iterator.hasNext()) {
@ -297,6 +311,51 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
}
@Override
public long messageSize() {
return messageSize.getTotalSize();
}
@Override
public synchronized Location add(Transaction tx, String key, Location value)
throws IOException {
Location location = super.add(tx, key, value);
messageSize.addSize(value.getSize());
return location;
}
@Override
public synchronized Location addFirst(Transaction tx, String key,
Location value) throws IOException {
Location location = super.addFirst(tx, key, value);
messageSize.addSize(value.getSize());
return location;
}
@Override
public synchronized void clear(Transaction tx) throws IOException {
messageSize.reset();
super.clear(tx);
}
@Override
protected synchronized void onLoad(ListNode<String, Location> node, Transaction tx) {
try {
Iterator<Entry<String, Location>> i = node.iterator(tx);
while (i.hasNext()) {
messageSize.addSize(i.next().getValue().getSize());
}
} catch (IOException e) {
LOG.warn("could not increment message size", e);
}
}
@Override
public void onRemove(Entry<String, Location> removed) {
super.onRemove(removed);
messageSize.addSize(-removed.getValue().getSize());
}
@Override
public String toString() {
return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";

View File

@ -1008,6 +1008,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
case None => 0
}
}
def getMessageSize(clientId: String, subscriptionName: String): Long = {
check_running
return 0
}
}
class LevelDBPList(val name: String, val key: Long) extends PList {
@ -1066,6 +1071,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def isEmpty = size()==0
def size(): Long = listSize.get()
def messageSize(): Long = 0
def iterator() = new PListIterator() {
check_running

View File

@ -227,6 +227,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
return 0;
}
@Override
public long getPendingMessageSize() {
return 0;
}
@Override
public int getPrefetchSize() {
return 0;
@ -354,10 +359,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
return 0;
}
@Override
public void incrementConsumedCount(){
}
@Override
public void resetConsumedCount(){
}

View File

@ -102,6 +102,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public void testNoDispatchToRemovedConsumers() throws Exception {
final AtomicInteger producerId = new AtomicInteger();
Runnable sender = new Runnable() {
@Override
public void run() {
AtomicInteger id = new AtomicInteger();
int producerIdAndIncrement = producerId.getAndIncrement();
@ -120,6 +121,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
};
Runnable subRemover = new Runnable() {
@Override
public void run() {
for (Subscription sub : subs) {
try {
@ -177,10 +179,12 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
List<MessageReference> dispatched =
Collections.synchronizedList(new ArrayList<MessageReference>());
@Override
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
}
@Override
public void add(MessageReference node) throws Exception {
// immediate dispatch
QueueMessageReference qmr = (QueueMessageReference)node;
@ -188,6 +192,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
dispatched.add(qmr);
}
@Override
public ConnectionContext getContext() {
return null;
}
@ -228,76 +233,100 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public void resetConsumedCount() {
}
@Override
public void add(ConnectionContext context, Destination destination)
throws Exception {
}
@Override
public void destroy() {
}
@Override
public void gc() {
}
@Override
public ConsumerInfo getConsumerInfo() {
return info;
}
@Override
public long getDequeueCounter() {
return 0;
}
@Override
public long getDispatchedCounter() {
return 0;
}
@Override
public int getDispatchedQueueSize() {
return 0;
}
@Override
public long getEnqueueCounter() {
return 0;
}
@Override
public int getInFlightSize() {
return 0;
}
@Override
public int getInFlightUsage() {
return 0;
}
@Override
public ObjectName getObjectName() {
return null;
}
@Override
public int getPendingQueueSize() {
return 0;
}
@Override
public long getPendingMessageSize() {
return 0;
}
@Override
public int getPrefetchSize() {
return 0;
}
@Override
public String getSelector() {
return null;
}
@Override
public boolean isBrowser() {
return false;
}
@Override
public boolean isFull() {
return false;
}
@Override
public boolean isHighWaterMark() {
return false;
}
@Override
public boolean isLowWaterMark() {
return false;
}
@Override
public boolean isRecoveryRequired() {
return false;
}
@ -306,19 +335,23 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return false;
}
@Override
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
return true;
}
@Override
public boolean matches(ActiveMQDestination destination) {
return false;
}
@Override
public void processMessageDispatchNotification(
MessageDispatchNotification mdn) throws Exception {
}
@Override
public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
return null;
@ -329,34 +362,42 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return false;
}
@Override
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
return new ArrayList<MessageReference>(dispatched);
}
@Override
public void setObjectName(ObjectName objectName) {
}
@Override
public void setSelector(String selector)
throws InvalidSelectorException, UnsupportedOperationException {
}
@Override
public void updateConsumerPrefetch(int newPrefetch) {
}
@Override
public boolean addRecoveredMessage(ConnectionContext context,
MessageReference message) throws Exception {
return false;
}
@Override
public ActiveMQDestination getActiveMQDestination() {
return null;
}
@Override
public int getLockPriority() {
return 0;
}
@Override
public boolean isLockExclusive() {
return false;
}
@ -367,6 +408,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public void removeDestination(Destination destination) {
}
@Override
public int countBeforeFull() {
return 10;
}

View File

@ -0,0 +1,547 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.AbstractStoreStatTestSupport;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that KahaDB properly sets the new storeMessageSize statistic.
*
* AMQ-5748
*
*/
public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStatTestSupport {
protected static final Logger LOG = LoggerFactory
.getLogger(AbstractPendingMessageCursorTest.class);
protected BrokerService broker;
protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic";
protected static int maxMessageSize = 1000;
@Before
public void startBroker() throws Exception {
setUpBroker(true);
}
protected void setUpBroker(boolean clearDataDir) throws Exception {
broker = new BrokerService();
this.initPersistence(broker);
//set up a transport
TransportConnector connector = broker
.addConnector(new TransportConnector());
connector.setUri(new URI("tcp://0.0.0.0:0"));
connector.setName("tcp");
PolicyEntry policy = new PolicyEntry();
policy.setTopicPrefetch(100);
policy.setDurableTopicPrefetch(100);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.start();
broker.waitUntilStarted();
brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
@Override
protected BrokerService getBroker() {
return this.broker;
}
@Override
protected URI getBrokerConnectURI() {
return this.brokerConnectURI;
}
protected abstract void initPersistence(BrokerService brokerService) throws IOException;
@Test
public void testQueueMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
verifyPendingStats(dest, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get());
}
@Test
public void testQueueBrowserMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
browseTestQueueMessages(dest.getName());
verifyPendingStats(dest, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get());
}
@Test
public void testQueueMessageSizeNonPersistent() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200,
DeliveryMode.NON_PERSISTENT, publishedMessageSize);
verifyPendingStats(dest, 200, publishedMessageSize.get());
}
@Test
public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(100,
DeliveryMode.PERSISTENT, publishedMessageSize);
dest = publishTestQueueMessages(100,
DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
verifyPendingStats(dest, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
verifyStoreStats(dest, 100, publishedMessageSize.get());
}
@Test
public void testQueueMessageSizeAfterConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
verifyPendingStats(dest, 200, publishedMessageSize.get());
consumeTestQueueMessages();
verifyPendingStats(dest, 0, 0);
verifyStoreStats(dest, 0, 0);
}
@Test
public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
verifyPendingStats(dest, 200, publishedMessageSize.get());
consumeTestQueueMessages();
verifyPendingStats(dest, 0, 0);
verifyStoreStats(dest, 0, 0);
}
@Test(timeout=100000)
public void testTopicMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200, publishedMessageSize);
//verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
//are dispatched because we have an active consumer online
//verify that the size is greater than 100 messages times the minimum size of 100
verifyPendingStats(dest, 100, 100 * 100);
//consume all messages
consumeTestMessages(consumer, 200);
//All messages should now be gone
verifyPendingStats(dest, 0, 0);
connection.close();
}
@Test(timeout=100000)
public void testTopicNonPersistentMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200,
DeliveryMode.NON_PERSISTENT, publishedMessageSize);
//verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
//are dispatched because we have an active consumer online
//verify the size is at least as big as 100 messages times the minimum of 100 size
verifyPendingStats(dest, 100, 100 * 100);
//consume all messages
consumeTestMessages(consumer, 200);
//All messages should now be gone
verifyPendingStats(dest, 0, 0);
connection.close();
}
@Test(timeout=100000)
public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(100,
DeliveryMode.NON_PERSISTENT, publishedMessageSize);
dest = publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
//verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
//are dispatched because we have an active consumer online
//verify the size is at least as big as 100 messages times the minimum of 100 size
verifyPendingStats(dest, 100, 100 * 100);
//consume all messages
consumeTestMessages(consumer, 200);
//All messages should now be gone
verifyPendingStats(dest, 0, 0);
connection.close();
}
@Test(timeout=10000)
public void testMessageSizeOneDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
//verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get());
//consume all messages
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//All messages should now be gone
verifyPendingStats(dest, subKey, 0, 0);
verifyStoreStats(dest, 0, 0);
connection.close();
}
@Test(timeout=10000)
public void testMessageSizeOneDurablePartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
connection, new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
//verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get());
//consume all messages
consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
//150 should be left
verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
verifyStoreStats(dest, 150, publishedMessageSize.get());
connection.close();
}
@Test(timeout=10000)
public void testMessageSizeTwoDurables() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
org.apache.activemq.broker.region.Topic dest =
publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200,
publishedMessageSize, DeliveryMode.PERSISTENT);
//verify the count and size
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
//consume messages just for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//There is still a durable that hasn't consumed so the messages should exist
SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
verifyPendingStats(dest, subKey, 0, 0);
verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
verifyStoreStats(dest, 200, publishedMessageSize.get());
connection.stop();
}
protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
final int count, final long minimumSize) throws Exception {
this.verifyPendingStats(queue, count, minimumSize, count, minimumSize);
}
protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
final int count, final long minimumSize, final int storeCount, final long minimumStoreSize) throws Exception {
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return queue.getPendingMessageCount() == count;
}
});
verifySize(count, new MessageSizeCalculator() {
@Override
public long getMessageSize() throws Exception {
return queue.getPendingMessageSize();
}
}, minimumSize);
}
//For a non-durable there won't necessarily be a message store
protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic,
final int count, final long minimumSize) throws Exception {
final TopicSubscription sub = (TopicSubscription) topic.getConsumers().get(0);
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return sub.getPendingQueueSize() == count;
}
});
verifySize(count, new MessageSizeCalculator() {
@Override
public long getMessageSize() throws Exception {
return sub.getPendingMessageSize();
}
}, minimumSize);
}
protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic, SubscriptionKey subKey,
final int count, final long minimumSize) throws Exception {
final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey);
//verify message count
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return sub.getPendingQueueSize() == count;
}
});
//verify message size
verifySize(count, new MessageSizeCalculator() {
@Override
public long getMessageSize() throws Exception {
return sub.getPendingMessageSize();
}
}, minimumSize);
}
protected void verifyStoreStats(org.apache.activemq.broker.region.Destination dest,
final int storeCount, final long minimumStoreSize) throws Exception {
final MessageStore messageStore = dest.getMessageStore();
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return messageStore.getMessageCount() == storeCount;
}
});
verifySize(storeCount, new MessageSizeCalculator() {
@Override
public long getMessageSize() throws Exception {
return messageStore.getMessageSize();
}
}, minimumStoreSize);
}
protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator,
final long minimumSize) throws Exception {
if (count > 0) {
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return messageSizeCalculator.getMessageSize() > minimumSize ;
}
});
} else {
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return messageSizeCalculator.getMessageSize() == 0;
}
});
}
}
protected static interface MessageSizeCalculator {
long getMessageSize() throws Exception;
}
protected Destination consumeTestMessages(MessageConsumer consumer, int size) throws Exception {
return consumeTestMessages(consumer, size, defaultTopicName);
}
protected Destination consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
topicName);
Destination dest = broker.getDestination(activeMqTopic);
//Topic topic = session.createTopic(topicName);
try {
for (int i = 0; i < size; i++) {
consumer.receive();
}
} finally {
//session.close();
}
return dest;
}
protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, AtomicLong publishedMessageSize) throws Exception {
return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
}
protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection,
String[] subNames, int publishSize, AtomicLong publishedMessageSize, int deliveryMode) throws Exception {
return publishTestMessagesDurable(connection, subNames, defaultTopicName,
publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize,
publishedMessageSize, false, deliveryMode);
}
protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
AtomicLong publishedMessageSize) throws Exception {
return publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize);
}
protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
int deliveryMode, AtomicLong publishedMessageSize) throws Exception {
// create a new queue
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId2");
connection.start();
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
defaultTopicName);
org.apache.activemq.broker.region.Topic dest =
(org.apache.activemq.broker.region.Topic) broker.getDestination(activeMqTopic);
// Start the connection
Session session = connection.createSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(defaultTopicName);
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(session, AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize));
}
} finally {
connection.close();
}
return dest;
}
protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count,
AtomicLong publishedMessageSize) throws Exception {
return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
}
protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, int deliveryMode,
AtomicLong publishedMessageSize) throws Exception {
return publishTestQueueMessages(count, defaultQueueName, deliveryMode,
AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
}
protected Destination consumeTestQueueMessages() throws Exception {
return consumeTestQueueMessages(defaultQueueName);
}
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that pending message metrics work properly with KahaDB
*
* AMQ-5923
*
*/
public class KahaDBPendingMessageCursorTest extends
AbstractPendingMessageCursorTest {
protected static final Logger LOG = LoggerFactory
.getLogger(KahaDBPendingMessageCursorTest.class);
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@Override
protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.getRoot().exists())
FileUtils.cleanDirectory(dataFileDir.getRoot());
super.setUpBroker(clearDataDir);
}
@Override
protected void initPersistence(BrokerService brokerService)
throws IOException {
broker.setPersistent(true);
broker.setDataDirectoryFile(dataFileDir.getRoot());
}
/**
* Test that the the counter restores size and works after restart and more
* messages are published
*
* @throws Exception
*/
@Test(timeout=10000)
public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Topic topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
publishedMessageSize, DeliveryMode.PERSISTENT);
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
// verify the count and size
verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
verifyStoreStats(topic, 200, publishedMessageSize.get());
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
publishedMessageSize, DeliveryMode.PERSISTENT);
// verify the count and size
verifyPendingStats(topic, subKey, 400, publishedMessageSize.get());
verifyStoreStats(topic, 400, publishedMessageSize.get());
}
/**
* Test that the the counter restores size and works after restart and more
* messages are published
*
* @throws Exception
*/
@Test(timeout=10000)
public void testNonPersistentDurableMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Topic topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
publishedMessageSize, DeliveryMode.NON_PERSISTENT);
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
// verify the count and size
verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
verifyStoreStats(topic, 0, 0);
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that PendingMessageCursor size statistics work with the MemoryPersistentAdapter
*
* AMQ-5748
*
*/
public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
protected static final Logger LOG = LoggerFactory
.getLogger(MemoryPendingMessageCursorTest.class);
@Override
protected void initPersistence(BrokerService brokerService) throws IOException {
broker.setPersistent(false);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
}
@Override
@Test(timeout=10000)
public void testMessageSizeOneDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
org.apache.activemq.broker.region.Topic dest =
publishTestMessagesDurable(connection, new String[] {"sub1"},
200, publishedMessageSize, DeliveryMode.PERSISTENT);
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
verifyStoreStats(dest, 100, publishedMessageSize.get());
//consume 100 messages
consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
//100 should be left
verifyPendingStats(dest, subKey, 100, publishedMessageSize.get());
verifyStoreStats(dest, 100, publishedMessageSize.get());
connection.close();
}
@Override
@Test(timeout=10000)
public void testMessageSizeTwoDurables() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
org.apache.activemq.broker.region.Topic dest =
publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"},
200, publishedMessageSize, DeliveryMode.PERSISTENT);
//verify the count and size
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
//consume messages just for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//There is still a durable that hasn't consumed so the messages should exist
SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
verifyPendingStats(dest, subKey, 0, 0);
verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
verifyStoreStats(dest, 100, publishedMessageSize.get());
connection.stop();
}
@Override
@Test(timeout=10000)
public void testMessageSizeOneDurablePartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
//verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
verifyStoreStats(dest, 100, publishedMessageSize.get());
//consume all messages
consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
//All messages should now be gone
verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
//verify the size is at least as big as 100 messages times the minimum of 100 size
verifyStoreStats(dest, 100, 100 * 100);
connection.close();
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
/**
* This test checks that pending message metrics work properly with MultiKahaDB
*
* AMQ-5923
*
*/
public class MultiKahaDBPendingMessageCursorTest extends
KahaDBPendingMessageCursorTest {
@Override
protected void initPersistence(BrokerService brokerService)
throws IOException {
broker.setPersistent(true);
//setup multi-kaha adapter
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(dataFileDir.getRoot());
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setJournalMaxFileLength(1024 * 512);
//set up a store per destination
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
filtered.setPersistenceAdapter(kahaStore);
filtered.setPerDestination(true);
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
stores.add(filtered);
persistenceAdapter.setFilteredPersistenceAdapters(stores);
broker.setPersistenceAdapter(persistenceAdapter);
}
}

View File

@ -308,6 +308,16 @@ public class OrderPendingListTest {
return theList.size();
}
@Override
public long messageSize() {
long size = 0;
Iterator<MessageReference> i = theList.iterator();
while (i.hasNext()) {
size += i.next().getMessage().getSize();
}
return size;
}
@Override
public Iterator<MessageReference> iterator() {
return theList.iterator();

View File

@ -16,38 +16,19 @@
*/
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
@ -62,7 +43,7 @@ import org.slf4j.LoggerFactory;
* AMQ-5748
*
*/
public abstract class AbstractMessageStoreSizeStatTest {
public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStatTestSupport {
protected static final Logger LOG = LoggerFactory
.getLogger(AbstractMessageStoreSizeStatTest.class);
@ -71,7 +52,6 @@ public abstract class AbstractMessageStoreSizeStatTest {
protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic";
protected static int messageSize = 1000;
@Before
public void startBroker() throws Exception {
@ -100,39 +80,52 @@ public abstract class AbstractMessageStoreSizeStatTest {
broker.waitUntilStopped();
}
protected abstract void initPersistence(BrokerService brokerService) throws IOException;
@Test
public void testMessageSize() throws Exception {
Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
@Override
protected BrokerService getBroker() {
return this.broker;
}
@Test
public void testMessageSizeAfterConsumption() throws Exception {
@Override
protected URI getBrokerConnectURI() {
return this.brokerConnectURI;
}
Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
protected abstract void initPersistence(BrokerService brokerService) throws IOException;
@Test(timeout=10000)
public void testMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Destination dest = publishTestQueueMessages(200, publishedMessageSize);
verifyStats(dest, 200, publishedMessageSize.get());
}
@Test(timeout=10000)
public void testMessageSizeAfterConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Destination dest = publishTestQueueMessages(200, publishedMessageSize);
verifyStats(dest, 200, publishedMessageSize.get());
consumeTestQueueMessages();
verifyStats(dest, 0, 0);
}
@Test
@Test(timeout=10000)
public void testMessageSizeOneDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200);
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize);
//verify the count and size
verifyStats(dest, 200, 200 * messageSize);
verifyStats(dest, 200, publishedMessageSize.get());
//consume all messages
consumeDurableTestMessages(connection, "sub1", 200);
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//All messages should now be gone
verifyStats(dest, 0, 0);
@ -142,21 +135,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
@Test(timeout=10000)
public void testMessageSizeTwoDurables() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200);
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize);
//verify the count and size
verifyStats(dest, 200, 200 * messageSize);
verifyStats(dest, 200, publishedMessageSize.get());
//consume messages just for sub1
consumeDurableTestMessages(connection, "sub1", 200);
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//There is still a durable that hasn't consumed so the messages should exist
verifyStats(dest, 200, 200 * messageSize);
verifyStats(dest, 200, publishedMessageSize.get());
connection.stop();
@ -164,14 +157,24 @@ public abstract class AbstractMessageStoreSizeStatTest {
@Test
public void testMessageSizeAfterDestinationDeletion() throws Exception {
Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
AtomicLong publishedMessageSize = new AtomicLong();
Destination dest = publishTestQueueMessages(200, publishedMessageSize);
verifyStats(dest, 200, publishedMessageSize.get());
//check that the size is 0 after deletion
broker.removeDestination(dest.getActiveMQDestination());
verifyStats(dest, 0, 0);
}
@Test
public void testQueueBrowserMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Destination dest = publishTestQueueMessages(200, publishedMessageSize);
browseTestQueueMessages(dest.getName());
verifyStats(dest, 200, publishedMessageSize.get());
}
protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
final MessageStore messageStore = dest.getMessageStore();
final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
@ -203,164 +206,31 @@ public abstract class AbstractMessageStoreSizeStatTest {
}
}
/**
* Generate random 1 megabyte messages
* @param session
* @return
* @throws JMSException
*/
protected BytesMessage createMessage(Session session) throws JMSException {
final BytesMessage message = session.createBytesMessage();
final byte[] data = new byte[messageSize];
final Random rng = new Random();
rng.nextBytes(data);
message.writeBytes(data);
return message;
protected Destination publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
}
protected Destination publishTestQueueMessages(int count) throws Exception {
return publishTestQueueMessages(count, defaultQueueName);
}
protected Destination publishTestQueueMessages(int count, String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
Destination dest = broker.getDestination(activeMqQueue);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
.createConnection();
connection.setClientID("clientId" + queueName);
connection.start();
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(queue);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < count; i++) {
prod.send(createMessage(session));
}
} finally {
connection.close();
}
return dest;
protected Destination publishTestQueueMessages(int count, String queueName, AtomicLong publishedMessageSize) throws Exception {
return publishTestQueueMessages(count, queueName, DeliveryMode.PERSISTENT,
AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
}
protected Destination consumeTestQueueMessages() throws Exception {
return consumeTestQueueMessages(defaultQueueName);
}
protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception {
return consumeDurableTestMessages(connection, sub, size, defaultTopicName);
protected Destination consumeDurableTestMessages(Connection connection, String sub, int size,
AtomicLong publishedMessageSize) throws Exception {
return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
}
protected Destination consumeTestQueueMessages(String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
Destination dest = broker.getDestination(activeMqQueue);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 200; i++) {
consumer.receive();
}
} finally {
connection.stop();
}
return dest;
}
protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
topicName);
Destination dest = broker.getDestination(activeMqTopic);
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
try {
TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
for (int i = 0; i < size; i++) {
consumer.receive();
}
} finally {
session.close();
}
return dest;
}
protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
defaultTopicName);
Destination dest = broker.getDestination(activeMqTopic);
// Start the connection
Session session = connection.createSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(defaultTopicName);
for (String subName : subNames) {
session.createDurableSubscriber(topic, subName);
}
// browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
//in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
//then the statistics won't be updated properly because a new store would overwrite the old store
//which is still in use
ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(session));
}
//verify the view has expected messages
assertEquals(subNames.length, subs.length);
ObjectName subName = subs[0];
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
CompositeData[] data = sub.browse();
assertNotNull(data);
assertEquals(expectedSize, data.length);
} finally {
session.close();
}
return dest;
protected Destination publishTestMessagesDurable(Connection connection, String[] subNames,
int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception {
return publishTestMessagesDurable(connection, subNames, defaultTopicName,
publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize,
publishedMessageSize, true);
}
}

View File

@ -0,0 +1,268 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.net.URI;
import java.util.Enumeration;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
/**
*
*
*/
public abstract class AbstractStoreStatTestSupport {
protected static int defaultMessageSize = 1000;
protected abstract BrokerService getBroker();
protected abstract URI getBrokerConnectURI();
protected Destination consumeTestQueueMessages(String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
Destination dest = getBroker().getDestination(activeMqQueue);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 200; i++) {
consumer.receive();
}
} finally {
connection.stop();
}
return dest;
}
protected Destination browseTestQueueMessages(String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
Destination dest = getBroker().getDestination(activeMqQueue);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
QueueBrowser queueBrowser = session.createBrowser(queue);
@SuppressWarnings("unchecked")
Enumeration<Message> messages = queueBrowser.getEnumeration();
while (messages.hasMoreElements()) {
messages.nextElement();
}
} finally {
connection.stop();
}
return dest;
}
protected Destination consumeDurableTestMessages(Connection connection, String sub,
int size, String topicName, AtomicLong publishedMessageSize) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
topicName);
Destination dest = getBroker().getDestination(activeMqTopic);
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
try {
TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
for (int i = 0; i < size; i++) {
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
if (publishedMessageSize != null) {
publishedMessageSize.addAndGet(-message.getSize());
}
}
} finally {
session.close();
}
return dest;
}
protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, String queueName,
int deliveryMode, int messageSize, AtomicLong publishedMessageSize) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
Destination dest = getBroker().getDestination(activeMqQueue);
// Start the connection
Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
.createConnection();
connection.setClientID("clientId" + queueName);
connection.start();
Session session = connection.createSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(queue);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < count; i++) {
prod.send(createMessage(session, messageSize, publishedMessageSize));
}
} finally {
connection.close();
}
return (org.apache.activemq.broker.region.Queue) dest;
}
protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
boolean verifyBrowsing) throws Exception {
return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT);
}
protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
boolean verifyBrowsing, int deliveryMode) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
topicName);
Destination dest = getBroker().getDestination(activeMqTopic);
// Start the connection
Session session = connection.createSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
for (String subName : subNames) {
session.createDurableSubscriber(topic, subName);
}
ObjectName[] subs = null;
if (verifyBrowsing) {
// browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
//in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
//then the statistics won't be updated properly because a new store would overwrite the old store
//which is still in use
subs = getBroker().getAdminView().getDurableTopicSubscribers();
}
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(session, messageSize, publishedMessageSize));
}
//verify the view has expected messages
if (verifyBrowsing) {
assertNotNull(subs);
assertEquals(subNames.length, subs.length);
ObjectName subName = subs[0];
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
getBroker().getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
CompositeData[] data = sub.browse();
assertNotNull(data);
assertEquals(expectedSize, data.length);
}
} finally {
session.close();
}
return (org.apache.activemq.broker.region.Topic) dest;
}
/**
* Generate random messages between 100 bytes and messageSize
* @param session
* @return
* @throws JMSException
*/
protected BytesMessage createMessage(Session session, int messageSize, AtomicLong publishedMessageSize) throws JMSException {
final BytesMessage message = session.createBytesMessage();
final Random rn = new Random();
int size = rn.nextInt(messageSize - 100);
if (publishedMessageSize != null) {
publishedMessageSize.addAndGet(size);
}
final byte[] data = new byte[size];
final Random rng = new Random();
rng.nextBytes(data);
message.writeBytes(data);
return message;
}
}

View File

@ -18,12 +18,15 @@ package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,12 +42,13 @@ public class KahaDBMessageStoreSizeStatTest extends
protected static final Logger LOG = LoggerFactory
.getLogger(KahaDBMessageStoreSizeStatTest.class);
File dataFileDir = new File("target/test-amq-5748/stat-datadb");
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@Override
protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.exists())
FileUtils.cleanDirectory(dataFileDir);
if (clearDataDir && dataFileDir.getRoot().exists())
FileUtils.cleanDirectory(dataFileDir.getRoot());
super.setUpBroker(clearDataDir);
}
@ -52,7 +56,7 @@ public class KahaDBMessageStoreSizeStatTest extends
protected void initPersistence(BrokerService brokerService)
throws IOException {
broker.setPersistent(true);
broker.setDataDirectoryFile(dataFileDir);
broker.setDataDirectoryFile(dataFileDir.getRoot());
}
/**
@ -63,19 +67,19 @@ public class KahaDBMessageStoreSizeStatTest extends
*/
@Test
public void testMessageSizeAfterRestartAndPublish() throws Exception {
Destination dest = publishTestQueueMessages(200);
AtomicLong publishedMessageSize = new AtomicLong();
Destination dest = publishTestQueueMessages(200, publishedMessageSize);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
verifyStats(dest, 200, publishedMessageSize.get());
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestQueueMessages(200);
dest = publishTestQueueMessages(200, publishedMessageSize);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);
verifyStats(dest, 400, publishedMessageSize.get());
}

View File

@ -22,12 +22,15 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,12 +46,13 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
protected static final Logger LOG = LoggerFactory
.getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
File dataFileDir = new File("target/test-amq-5748/stat-datadb");
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@Override
protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.exists())
FileUtils.cleanDirectory(dataFileDir);
if (clearDataDir && dataFileDir.getRoot().exists())
FileUtils.cleanDirectory(dataFileDir.getRoot());
super.setUpBroker(clearDataDir);
}
@ -59,7 +63,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
//setup multi-kaha adapter
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(dataFileDir);
persistenceAdapter.setDirectory(dataFileDir.getRoot());
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setJournalMaxFileLength(1024 * 512);
@ -81,51 +85,53 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
*
* @throws Exception
*/
@Test
@Test(timeout=10000)
public void testMessageSizeAfterRestartAndPublish() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Destination dest = publishTestQueueMessages(200);
Destination dest = publishTestQueueMessages(200, publishedMessageSize);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
verifyStats(dest, 200, publishedMessageSize.get());
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestQueueMessages(200);
dest = publishTestQueueMessages(200, publishedMessageSize);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);
verifyStats(dest, 400, publishedMessageSize.get());
}
@Test
@Test(timeout=10000)
public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
AtomicLong publishedMessageSize2 = new AtomicLong();
Destination dest = publishTestQueueMessages(200);
Destination dest = publishTestQueueMessages(200, publishedMessageSize);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
verifyStats(dest, 200, publishedMessageSize.get());
assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get());
Destination dest2 = publishTestQueueMessages(200, "test.queue2");
Destination dest2 = publishTestQueueMessages(200, "test.queue2", publishedMessageSize2);
// verify the count and size
verifyStats(dest2, 200, 200 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize);
verifyStats(dest2, 200, publishedMessageSize2.get());
assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get() + publishedMessageSize2.get());
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
dest = publishTestQueueMessages(200);
dest2 = publishTestQueueMessages(200, "test.queue2");
dest = publishTestQueueMessages(200, publishedMessageSize);
dest2 = publishTestQueueMessages(200, "test.queue2", publishedMessageSize2);
// verify the count and size after publishing messages
verifyStats(dest, 400, 400 * messageSize);
verifyStats(dest2, 400, 400 * messageSize);
verifyStats(dest, 400, publishedMessageSize.get());
verifyStats(dest2, 400, publishedMessageSize2.get());
System.out.println(broker.getPersistenceAdapter().size());
assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get() + publishedMessageSize2.get());
assertTrue(broker.getPersistenceAdapter().size() >=
(dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize()));

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
@ -24,6 +25,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
import org.apache.activemq.store.AbstractStoreStatTestSupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,21 +49,23 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
@Override
@Test(timeout=10000)
public void testMessageSizeOneDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100);
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100, publishedMessageSize);
//verify the count and size, should be 100 because of the LRUCache
verifyStats(dest, 100, 100 * messageSize);
//verify size is at least the minimum of 100 messages times 100 bytes
verifyStats(dest, 100, 100 * 100);
consumeDurableTestMessages(connection, "sub1", 100);
consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
//Since an LRU cache is used and messages are kept in memory, this should be 100 still
verifyStats(dest, 100, 100 * messageSize);
verifyStats(dest, 100, publishedMessageSize.get());
connection.stop();
@ -70,22 +74,24 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
@Override
@Test(timeout=10000)
public void testMessageSizeTwoDurables() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100, so only 100 messages are kept
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100);
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100, publishedMessageSize);
//verify the count and size
verifyStats(dest, 100, 100 * messageSize);
//verify size is at least the minimum of 100 messages times 100 bytes
verifyStats(dest, 100, 100 * 100);
//consume for sub1
consumeDurableTestMessages(connection, "sub1", 100);
consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
//Should be 100 messages still
verifyStats(dest, 100, 100 * messageSize);
verifyStats(dest, 100, publishedMessageSize.get());
connection.stop();