This commit is contained in:
Clebert Suconic 2018-02-08 12:10:11 -05:00
commit 4ef6e3281d
57 changed files with 2553 additions and 392 deletions

View File

@ -657,6 +657,15 @@ public interface Message {
int getMemoryEstimate();
/**
* This is the size of the message when persisted on disk which is used for metrics tracking
* Note that even if the message itself is not persisted on disk (ie non-durable) this value is
* still used for metrics tracking
* If a normal message it will be the encoded message size
* If a large message it will be encoded message size + large message body size
* @return
* @throws ActiveMQException
*/
long getPersistentSize() throws ActiveMQException;
}

View File

@ -80,12 +80,52 @@ public interface QueueControl {
@Attribute(desc = "number of messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
long getMessageCount();
/**
* Returns the persistent size of all messages currently in this queue. The persistent size of a message
* is the amount of space the message would take up on disk which is used to track how much data there
* is to consume on this queue
*/
@Attribute(desc = "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)")
long getPersistentSize();
/**
* Returns the number of durable messages currently in this queue.
*/
@Attribute(desc = "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
long getDurableMessageCount();
/**
* Returns the persistent size of durable messages currently in this queue. The persistent size of a message
* is the amount of space the message would take up on disk which is used to track how much data there
* is to consume on this queue
*/
@Attribute(desc = "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
long getDurablePersistentSize();
/**
* Returns the number of scheduled messages in this queue.
*/
@Attribute(desc = "number of scheduled messages in this queue")
long getScheduledCount();
/**
* Returns the size of scheduled messages in this queue.
*/
@Attribute(desc = "persistent size of scheduled messages in this queue")
long getScheduledSize();
/**
* Returns the number of durable scheduled messages in this queue.
*/
@Attribute(desc = "number of durable scheduled messages in this queue")
long getDurableScheduledCount();
/**
* Returns the size of durable scheduled messages in this queue.
*/
@Attribute(desc = "persistent size of durable scheduled messages in this queue")
long getDurableScheduledSize();
/**
* Returns the number of consumers consuming messages from this queue.
*/
@ -98,6 +138,24 @@ public interface QueueControl {
@Attribute(desc = "number of messages that this queue is currently delivering to its consumers")
int getDeliveringCount();
/**
* Returns the persistent size of messages that this queue is currently delivering to its consumers.
*/
@Attribute(desc = "persistent size of messages that this queue is currently delivering to its consumers")
long getDeliveringSize();
/**
* Returns the number of durable messages that this queue is currently delivering to its consumers.
*/
@Attribute(desc = "number of durable messages that this queue is currently delivering to its consumers")
int getDurableDeliveringCount();
/**
* Returns the size of durable messages that this queue is currently delivering to its consumers.
*/
@Attribute(desc = "persistent size of durable messages that this queue is currently delivering to its consumers")
long getDurableDeliveringSize();
/**
* Returns the number of messages added to this queue since it was created.
*/

View File

@ -1150,4 +1150,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
}
@Override
public long getPersistentSize() throws ActiveMQException {
return getEncodeSize();
}
}

View File

@ -695,4 +695,9 @@ public class MessageInternalImpl implements MessageInternal {
return new TypedProperties(message.getTypedProperties());
}
@Override
public long getPersistentSize() throws ActiveMQException {
return message.getPersistentSize();
}
}

View File

@ -191,7 +191,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private Executor appendExecutor = null;
private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
private final ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
private final ExecutorFactory providedIOThreadPool;
protected ExecutorFactory ioExecutorFactory;
@ -2413,7 +2413,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final List<JournalFile> newFiles,
final List<Pair<String, String>> renames) throws Exception {
return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames);
return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, renames);
}
@ -2763,7 +2763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
ArrayList<String> newFiles = new ArrayList<>();
ArrayList<Pair<String, String>> renames = new ArrayList<>();
SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles, renames);
SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames);
if (controlFile != null) {
for (String dataFile : dataFiles) {
SequentialFile file = fileFactory.createSequentialFile(dataFile);

View File

@ -24,10 +24,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RefCountMessage;
@ -60,6 +58,10 @@ import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
@ -1179,4 +1181,9 @@ public class AMQPMessage extends RefCountMessage {
private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
}
@Override
public long getPersistentSize() throws ActiveMQException {
return getEncodeSize();
}
}

View File

@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import io.netty.buffer.ByteBuf;
// TODO: Implement this
public class OpenwireMessage implements Message {
@ -496,4 +498,9 @@ public class OpenwireMessage implements Message {
public int getMemoryEstimate() {
return 0;
}
@Override
public long getPersistentSize() throws ActiveMQException {
return 0;
}
}

View File

@ -226,6 +226,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public long getPersistentSize() {
checkStarted();
clearIO();
try {
return queue.getPersistentSize();
} finally {
blockOnIO();
}
}
@Override
public long getDurableMessageCount() {
checkStarted();
clearIO();
try {
return queue.getDurableMessageCount();
} finally {
blockOnIO();
}
}
@Override
public long getDurablePersistentSize() {
checkStarted();
clearIO();
try {
return queue.getDurablePersistentSize();
} finally {
blockOnIO();
}
}
@Override
public int getConsumerCount() {
checkStarted();
@ -250,6 +286,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public long getDeliveringSize() {
checkStarted();
clearIO();
try {
return queue.getDeliveringSize();
} finally {
blockOnIO();
}
}
@Override
public int getDurableDeliveringCount() {
checkStarted();
clearIO();
try {
return queue.getDurableDeliveringCount();
} finally {
blockOnIO();
}
}
@Override
public long getDurableDeliveringSize() {
checkStarted();
clearIO();
try {
return queue.getDurableDeliveringSize();
} finally {
blockOnIO();
}
}
@Override
public long getMessagesAdded() {
checkStarted();
@ -322,6 +394,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public long getScheduledSize() {
checkStarted();
clearIO();
try {
return queue.getScheduledSize();
} finally {
blockOnIO();
}
}
@Override
public long getDurableScheduledCount() {
checkStarted();
clearIO();
try {
return queue.getDurableScheduledCount();
} finally {
blockOnIO();
}
}
@Override
public long getDurableScheduledSize() {
checkStarted();
clearIO();
try {
return queue.getDurableScheduledSize();
} finally {
blockOnIO();
}
}
@Override
public String getDeadLetterAddress() {
checkStarted();
@ -998,7 +1106,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
long index = 0;
long start = (page - 1) * pageSize;
long end = Math.min((long)(page * pageSize), queue.getMessageCount());
long end = Math.min(page * pageSize, queue.getMessageCount());
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.paging;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -38,4 +39,13 @@ public interface PagedMessage extends EncodingSupport {
void initMessage(StorageManager storageManager);
long getTransactionID();
/**
* This is the size of the message when persisted on disk and is used for metrics tracking
* If a normal message it will be the encoded message size
* If a large message it will be encoded message size + large message body size
* @return
* @throws ActiveMQException
*/
long getPersistentSize() throws ActiveMQException;
}

View File

@ -28,6 +28,10 @@ public interface PagePosition extends Comparable<PagePosition> {
int getMessageNr();
long getPersistentSize();
void setPersistentSize(long persistentSize);
PagePosition nextMessage();
PagePosition nextPage();

View File

@ -44,6 +44,8 @@ public interface PageSubscription {
long getMessageCount();
long getPersistentSize();
long getId();
boolean isPersistent();
@ -161,4 +163,11 @@ public interface PageSubscription {
* @throws Exception
*/
void onDeletePage(Page deletedPage) throws Exception;
long getDeliveredCount();
long getDeliveredSize();
void incrementDeliveredSize(long size);
}

View File

@ -26,13 +26,17 @@ public interface PageSubscriptionCounter {
long getValue();
void increment(Transaction tx, int add) throws Exception;
long getPersistentSizeAdded();
void loadValue(long recordValueID, long value);
long getPersistentSize();
void loadInc(long recordInd, int add);
void increment(Transaction tx, int add, long persistentSize) throws Exception;
void applyIncrementOnTX(Transaction tx, long recordID, int add);
void loadValue(long recordValueID, long value, long persistentSize);
void loadInc(long recordInd, int add, long persistentSize);
void applyIncrementOnTX(Transaction tx, long recordID, int add, long persistentSize);
/**
* This will process the reload
@ -43,12 +47,12 @@ public interface PageSubscriptionCounter {
* @param id
* @param variance
*/
void addInc(long id, int variance);
void addInc(long id, int variance, long size);
// used when deleting the counter
void delete() throws Exception;
void pendingCounter(Page page, int increment) throws Exception;
void pendingCounter(Page page, int increment, long persistentSize) throws Exception;
// used when leaving page mode, so the counters are deleted in batches
// for each queue on the address

View File

@ -59,6 +59,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private final long messageID;
private long messageSize = -1;
@Override
public Object getProtocolData() {
return protocolData;
@ -104,6 +106,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
this.largeMessage = message.getMessage().isLargeMessage();
this.transactionID = message.getTransactionID();
this.messageID = message.getMessage().getMessageID();
//pre-cache the message size so we don't have to reload the message later if it is GC'd
getPersistentSize();
}
@Override
@ -191,7 +196,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override
public void handled() {
getQueue().referenceHandled();
getQueue().referenceHandled(this);
}
@Override
@ -280,4 +285,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
return messageID;
}
@Override
public long getPersistentSize() {
if (messageSize == -1) {
try {
messageSize = getPagedMessage().getPersistentSize();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
}
}
return messageSize;
}
}

View File

@ -36,6 +36,12 @@ public class PagePositionImpl implements PagePosition {
*/
private long recordID = -1;
/**
* Optional size value that can be set to specify the peristent size of the message
* for metrics tracking purposes
*/
private long persistentSize;
/**
* @param pageNr
* @param messageNr
@ -82,6 +88,22 @@ public class PagePositionImpl implements PagePosition {
return messageNr;
}
/**
* @return the persistentSize
*/
@Override
public long getPersistentSize() {
return persistentSize;
}
/**
* @param persistentSize the persistentSize to set
*/
@Override
public void setPersistentSize(long persistentSize) {
this.persistentSize = persistentSize;
}
@Override
public int compareTo(PagePosition o) {
if (pageNr > o.getPageNr()) {

View File

@ -21,10 +21,10 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
@ -60,10 +60,13 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
private final Executor executor;
private final AtomicLong value = new AtomicLong(0);
private final AtomicLong persistentSize = new AtomicLong(0);
private final AtomicLong added = new AtomicLong(0);
private final AtomicLong addedPersistentSize = new AtomicLong(0);
private final AtomicLong pendingValue = new AtomicLong(0);
private final AtomicLong pendingPersistentSize = new AtomicLong(0);
private final LinkedList<Long> incrementRecords = new LinkedList<>();
@ -71,9 +74,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
// we will recount a page case we still see pending records
// as soon as we close a page we remove these records replacing by a regular page increment record
// A Map per pageID, each page will have a set of IDs, with the increment on each one
private final Map<Long, Pair<Long, AtomicInteger>> pendingCounters = new HashMap<>();
private final Map<Long, PendingCounter> pendingCounters = new HashMap<>();
private LinkedList<Pair<Long, Integer>> loadList;
private LinkedList<PendingCounter> loadList;
private final Runnable cleanupCheck = new Runnable() {
@Override
@ -104,6 +107,16 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
return value.get() + pendingValue.get();
}
@Override
public long getPersistentSizeAdded() {
return addedPersistentSize.get() + pendingPersistentSize.get();
}
@Override
public long getPersistentSize() {
return persistentSize.get() + pendingPersistentSize.get();
}
/**
* This is used only on non transactional paging
*
@ -112,24 +125,25 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
* @throws Exception
*/
@Override
public synchronized void pendingCounter(Page page, int increment) throws Exception {
public synchronized void pendingCounter(Page page, int increment, long size) throws Exception {
if (!persistent) {
return; // nothing to be done
}
Pair<Long, AtomicInteger> pendingInfo = pendingCounters.get((long) page.getPageId());
PendingCounter pendingInfo = pendingCounters.get((long) page.getPageId());
if (pendingInfo == null) {
// We have to make sure this is sync here
// not syncing this to disk may cause the page files to be out of sync on pages.
// we can't afford the case where a page file is written without a record here
long id = storage.storePendingCounter(this.subscriptionID, page.getPageId(), increment);
pendingInfo = new Pair<>(id, new AtomicInteger(1));
pendingInfo = new PendingCounter(id, increment, size);
pendingCounters.put((long) page.getPageId(), pendingInfo);
} else {
pendingInfo.getB().addAndGet(increment);
pendingInfo.addAndGet(increment, size);
}
pendingValue.addAndGet(increment);
pendingPersistentSize.addAndGet(size);
page.addPendingCounter(this);
}
@ -141,23 +155,25 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
*/
@Override
public void cleanupNonTXCounters(final long pageID) throws Exception {
Pair<Long, AtomicInteger> pendingInfo;
PendingCounter pendingInfo;
synchronized (this) {
pendingInfo = pendingCounters.remove(pageID);
}
if (pendingInfo != null) {
final AtomicInteger valueCleaned = pendingInfo.getB();
final int valueCleaned = pendingInfo.getCount();
final long valueSizeCleaned = pendingInfo.getPersistentSize();
Transaction tx = new TransactionImpl(storage);
storage.deletePendingPageCounter(tx.getID(), pendingInfo.getA());
storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId());
// To apply the increment of the value just being cleaned
increment(tx, valueCleaned.get());
increment(tx, valueCleaned, valueSizeCleaned);
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
pendingValue.addAndGet(-valueCleaned.get());
pendingValue.addAndGet(-valueCleaned);
pendingPersistentSize.updateAndGet(val -> val >= valueSizeCleaned ? val - valueSizeCleaned : 0);
}
});
@ -166,21 +182,21 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
}
@Override
public void increment(Transaction tx, int add) throws Exception {
public void increment(Transaction tx, int add, long size) throws Exception {
if (tx == null) {
if (persistent) {
long id = storage.storePageCounterInc(this.subscriptionID, add);
incrementProcessed(id, add);
long id = storage.storePageCounterInc(this.subscriptionID, add, size);
incrementProcessed(id, add, size);
} else {
incrementProcessed(-1, add);
incrementProcessed(-1, add, size);
}
} else {
if (persistent) {
tx.setContainsPersistent();
long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
applyIncrementOnTX(tx, id, add);
long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add, size);
applyIncrementOnTX(tx, id, add, size);
} else {
applyIncrementOnTX(tx, -1, add);
applyIncrementOnTX(tx, -1, add, size);
}
}
}
@ -193,7 +209,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
* @param add
*/
@Override
public void applyIncrementOnTX(Transaction tx, long recordID1, int add) {
public void applyIncrementOnTX(Transaction tx, long recordID1, int add, long size) {
CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
if (oper == null) {
@ -202,22 +218,24 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
tx.addOperation(oper);
}
oper.operations.add(new ItemOper(this, recordID1, add));
oper.operations.add(new ItemOper(this, recordID1, add, size));
}
@Override
public synchronized void loadValue(final long recordID1, final long value1) {
public synchronized void loadValue(final long recordID1, final long value1, long size) {
if (this.subscription != null) {
// it could be null on testcases... which is ok
this.subscription.notEmpty();
}
this.value.set(value1);
this.added.set(value1);
this.persistentSize.set(size);
this.addedPersistentSize.set(size);
this.recordID = recordID1;
}
public synchronized void incrementProcessed(long id, int add) {
addInc(id, add);
public synchronized void incrementProcessed(long id, int add, long size) {
addInc(id, add, size);
if (incrementRecords.size() > FLUSH_COUNTER) {
executor.execute(cleanupCheck);
}
@ -259,12 +277,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
}
@Override
public void loadInc(long id, int add) {
public void loadInc(long id, int add, long size) {
if (loadList == null) {
loadList = new LinkedList<>();
}
loadList.add(new Pair<>(id, add));
loadList.add(new PendingCounter(id, add, size));
}
@Override
@ -275,10 +293,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
subscription.notEmpty();
}
for (Pair<Long, Integer> incElement : loadList) {
value.addAndGet(incElement.getB());
added.addAndGet(incElement.getB());
incrementRecords.add(incElement.getA());
for (PendingCounter incElement : loadList) {
value.addAndGet(incElement.getCount());
added.addAndGet(incElement.getCount());
persistentSize.addAndGet(incElement.getPersistentSize());
addedPersistentSize.addAndGet(incElement.getPersistentSize());
incrementRecords.add(incElement.getId());
}
loadList.clear();
loadList = null;
@ -286,11 +306,15 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
}
@Override
public synchronized void addInc(long id, int variance) {
public synchronized void addInc(long id, int variance, long size) {
value.addAndGet(variance);
this.persistentSize.addAndGet(size);
if (variance > 0) {
added.addAndGet(variance);
}
if (size > 0) {
addedPersistentSize.addAndGet(size);
}
if (id >= 0) {
incrementRecords.add(id);
}
@ -310,11 +334,13 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
ArrayList<Long> deleteList;
long valueReplace;
long sizeReplace;
synchronized (this) {
if (incrementRecords.size() <= FLUSH_COUNTER) {
return;
}
valueReplace = value.get();
sizeReplace = persistentSize.get();
deleteList = new ArrayList<>(incrementRecords);
incrementRecords.clear();
}
@ -332,7 +358,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
storage.deletePageCounter(txCleanup, recordID);
}
newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
if (logger.isTraceEnabled()) {
logger.trace("Replacing page-counter record = " + recordID + " by record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " for queue = " + this.subscription.getQueue().getName());
@ -354,10 +380,11 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
private static class ItemOper {
private ItemOper(PageSubscriptionCounterImpl counter, long id, int add) {
private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, long persistentSize) {
this.counter = counter;
this.id = id;
this.amount = add;
this.persistentSize = persistentSize;
}
PageSubscriptionCounterImpl counter;
@ -365,6 +392,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
long id;
int amount;
long persistentSize;
}
private static class CounterOperations extends TransactionOperationAbstract implements TransactionOperation {
@ -374,8 +403,55 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public void afterCommit(Transaction tx) {
for (ItemOper oper : operations) {
oper.counter.incrementProcessed(oper.id, oper.amount);
oper.counter.incrementProcessed(oper.id, oper.amount, oper.persistentSize);
}
}
}
private static class PendingCounter {
private static final AtomicIntegerFieldUpdater<PendingCounter> COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PendingCounter.class, "count");
private static final AtomicLongFieldUpdater<PendingCounter> SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(PendingCounter.class, "persistentSize");
private final long id;
private volatile int count;
private volatile long persistentSize;
/**
* @param id
* @param count
* @param size
*/
PendingCounter(long id, int count, long persistentSize) {
super();
this.id = id;
this.count = count;
this.persistentSize = persistentSize;
}
/**
* @return the id
*/
public long getId() {
return id;
}
/**
* @return the count
*/
public int getCount() {
return count;
}
/**
* @return the size
*/
public long getPersistentSize() {
return persistentSize;
}
public void addAndGet(int count, long persistentSize) {
COUNT_UPDATER.addAndGet(this, count);
SIZE_UPDATER.addAndGet(this, persistentSize);
}
}
}

View File

@ -96,6 +96,8 @@ final class PageSubscriptionImpl implements PageSubscription {
private final AtomicLong deliveredCount = new AtomicLong(0);
private final AtomicLong deliveredSize = new AtomicLong(0);
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
@ -177,6 +179,18 @@ final class PageSubscriptionImpl implements PageSubscription {
}
}
@Override
public long getPersistentSize() {
if (empty) {
return 0;
} else {
//A negative value could happen if an old journal was loaded that didn't have
//size metrics for old records
long messageSize = counter.getPersistentSize() - deliveredSize.get();
return messageSize > 0 ? messageSize : 0;
}
}
@Override
public PageSubscriptionCounter getCounter() {
return counter;
@ -439,7 +453,7 @@ final class PageSubscriptionImpl implements PageSubscription {
public void ackTx(final Transaction tx, final PagedReference reference) throws Exception {
confirmPosition(tx, reference.getPosition());
counter.increment(tx, -1);
counter.increment(tx, -1, -getPersistentSize(reference));
PageTransactionInfo txInfo = getPageTransaction(reference);
if (txInfo != null) {
@ -831,6 +845,12 @@ final class PageSubscriptionImpl implements PageSubscription {
}
PageCursorInfo info = getPageInfo(position);
PageCache cache = info.getCache();
long size = 0;
if (cache != null) {
size = getPersistentSize(cache.getMessage(position.getMessageNr()));
position.setPersistentSize(size);
}
logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", position, info);
@ -1060,6 +1080,13 @@ final class PageSubscriptionImpl implements PageSubscription {
}
}
/**
* @return the cache
*/
public PageCache getCache() {
return cache != null ? cache.get() : null;
}
}
private final class PageCursorTX extends TransactionOperationAbstract {
@ -1087,6 +1114,7 @@ final class PageSubscriptionImpl implements PageSubscription {
for (PagePosition confirmed : positions) {
cursor.processACK(confirmed);
cursor.deliveredCount.decrementAndGet();
cursor.deliveredSize.addAndGet(-confirmed.getPersistentSize());
}
}
@ -1309,4 +1337,43 @@ final class PageSubscriptionImpl implements PageSubscription {
public void close() {
}
}
/**
* @return the deliveredCount
*/
@Override
public long getDeliveredCount() {
return deliveredCount.get();
}
/**
* @return the deliveredSize
*/
@Override
public long getDeliveredSize() {
return deliveredSize.get();
}
@Override
public void incrementDeliveredSize(long size) {
deliveredSize.addAndGet(size);
}
private long getPersistentSize(PagedMessage msg) {
try {
return msg != null && msg.getPersistentSize() > 0 ? msg.getPersistentSize() : 0;
} catch (ActiveMQException e) {
logger.warn("Error computing persistent size of message: " + msg, e);
return 0;
}
}
private long getPersistentSize(PagedReference ref) {
try {
return ref != null && ref.getPersistentSize() > 0 ? ref.getPersistentSize() : 0;
} catch (ActiveMQException e) {
logger.warn("Error computing persistent size of message: " + ref, e);
return 0;
}
}
}

View File

@ -20,6 +20,7 @@ import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -169,4 +170,9 @@ public class PagedMessageImpl implements PagedMessage {
message +
"]";
}
@Override
public long getPersistentSize() throws ActiveMQException {
return message.getPersistentSize();
}
}

View File

@ -840,7 +840,8 @@ public class PagingStoreImpl implements PagingStore {
// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
applyPageCounters(tx, getCurrentPage(), listCtx);
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
applyPageCounters(tx, getCurrentPage(), listCtx, persistentSize);
currentPage.write(pagedMessage);
@ -906,22 +907,22 @@ public class PagingStoreImpl implements PagingStore {
* @param ctx
* @throws Exception
*/
private void applyPageCounters(Transaction tx, Page page, RouteContextList ctx) throws Exception {
private void applyPageCounters(Transaction tx, Page page, RouteContextList ctx, long size) throws Exception {
List<org.apache.activemq.artemis.core.server.Queue> durableQueues = ctx.getDurableQueues();
List<org.apache.activemq.artemis.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) {
if (tx == null) {
// non transactional writes need an intermediate place
// to avoid the counter getting out of sync
q.getPageSubscription().getCounter().pendingCounter(page, 1);
q.getPageSubscription().getCounter().pendingCounter(page, 1, size);
} else {
// null tx is treated through pending counters
q.getPageSubscription().getCounter().increment(tx, 1);
q.getPageSubscription().getCounter().increment(tx, 1, size);
}
}
for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) {
q.getPageSubscription().getCounter().increment(tx, 1);
q.getPageSubscription().getCounter().increment(tx, 1, size);
}
}

View File

@ -336,7 +336,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
/**
* @return The ID with the stored counter
*/
long storePageCounter(long txID, long queueID, long value) throws Exception;
long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception;
long storePendingCounter(long queueID, long pageID, int inc) throws Exception;
@ -350,13 +350,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
* @return the ID with the increment record
* @throws Exception
*/
long storePageCounterInc(long txID, long queueID, int add) throws Exception;
long storePageCounterInc(long txID, long queueID, int add, long persistentSize) throws Exception;
/**
* @return the ID with the increment record
* @throws Exception
*/
long storePageCounterInc(long queueID, int add) throws Exception;
long storePageCounterInc(long queueID, int add, long size) throws Exception;
/**
* @return the bindings journal

View File

@ -16,7 +16,13 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import javax.transaction.xa.Xid;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import java.io.File;
import java.io.FileInputStream;
import java.security.DigestInputStream;
@ -37,6 +43,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
@ -109,13 +117,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
/**
* Controls access to the journals and other storage files such as the ones used to store pages and
* large messages. This class must control writing of any non-transient data, as it is the key point
@ -1084,7 +1085,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.getCounter().loadValue(record.id, encoding.getValue());
sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
@ -1101,7 +1102,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.getCounter().loadInc(record.id, encoding.getValue());
sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize());
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
@ -1136,6 +1137,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: {
PageCountPendingImpl pendingCountEncoding = new PageCountPendingImpl();
pendingCountEncoding.decode(buff);
pendingCountEncoding.setID(record.id);
@ -1143,6 +1145,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (pendingNonTXPageCounter != null) {
pendingNonTXPageCounter.add(pendingCountEncoding);
}
break;
}
@ -1349,11 +1352,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
@Override
public long storePageCounterInc(long txID, long queueID, int value) throws Exception {
public long storePageCounterInc(long txID, long queueID, int value, long persistentSize) throws Exception {
readLock();
try {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value));
messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize));
return recordID;
} finally {
readUnLock();
@ -1361,11 +1364,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
@Override
public long storePageCounterInc(long queueID, int value) throws Exception {
public long storePageCounterInc(long queueID, int value, long persistentSize) throws Exception {
readLock();
try {
final long recordID = idGenerator.generateID();
messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value), true, getContext());
messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize), true, getContext());
return recordID;
} finally {
readUnLock();
@ -1373,11 +1376,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
@Override
public long storePageCounter(long txID, long queueID, long value) throws Exception {
public long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception {
readLock();
try {
final long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value, persistentSize));
return recordID;
} finally {
readUnLock();
@ -1789,7 +1792,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue());
sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize());
sub.notEmpty();
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());

View File

@ -16,7 +16,29 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import javax.transaction.xa.Xid;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
import java.io.File;
import java.io.PrintStream;
import java.util.HashMap;
@ -24,6 +46,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
@ -58,29 +82,6 @@ import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.XidCodecSupport;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
/**
* Outputs a String description of the Journals contents.
@ -217,9 +218,9 @@ public final class DescribeJournal {
out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
}
subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue());
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + ", result=" + subsCounter.getValue());
if (subsCounter.getValue() < 0) {
out.println(" #NegativeCounter!!!!");
} else {
@ -232,9 +233,9 @@ public final class DescribeJournal {
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue());
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + " increased by " + encoding.getValue());
if (subsCounter.getValue() < 0) {
out.println(" #NegativeCounter!!!!");
} else {
@ -321,7 +322,7 @@ public final class DescribeJournal {
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
} else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) o;
@ -329,7 +330,7 @@ public final class DescribeJournal {
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -35,6 +34,8 @@ import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import io.netty.buffer.Unpooled;
public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage {
// Constants -----------------------------------------------------
@ -344,6 +345,13 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
return file;
}
@Override
public long getPersistentSize() throws ActiveMQException {
long size = super.getPersistentSize();
size += getBodyEncoder().getLargeBodySize();
return size;
}
@Override
public String toString() {
return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +

View File

@ -23,9 +23,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
@Override
public String toString() {
return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + "]";
return "PageCountPendingImpl [queueID=" + queueID + ", pageID=" + pageID + "]";
}
public PageCountPendingImpl() {
@ -64,7 +65,7 @@ public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG * 2;
return DataConstants.SIZE_LONG * 3;
}
@Override

View File

@ -26,18 +26,21 @@ public class PageCountRecord implements EncodingSupport {
private long value;
private long persistentSize;
@Override
public String toString() {
return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
return "PageCountRecord [queueID=" + queueID + ", value=" + value + ", persistentSize=" + persistentSize + "]";
}
public PageCountRecord() {
}
public PageCountRecord(long queueID, long value) {
public PageCountRecord(long queueID, long value, long persistentSize) {
this.queueID = queueID;
this.value = value;
this.persistentSize = persistentSize;
}
public long getQueueID() {
@ -48,21 +51,30 @@ public class PageCountRecord implements EncodingSupport {
return value;
}
public long getPersistentSize() {
return persistentSize;
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG * 2;
return DataConstants.SIZE_LONG * 3;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeLong(value);
buffer.writeLong(persistentSize);
}
@Override
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
value = buffer.readLong();
if (buffer.readableBytes() > 0) {
persistentSize = buffer.readLong();
}
}
}

View File

@ -26,17 +26,20 @@ public class PageCountRecordInc implements EncodingSupport {
private int value;
private long persistentSize;
@Override
public String toString() {
return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + ", persistentSize=" + persistentSize + "]";
}
public PageCountRecordInc() {
}
public PageCountRecordInc(long queueID, int value) {
public PageCountRecordInc(long queueID, int value, long persistentSize) {
this.queueID = queueID;
this.value = value;
this.persistentSize = persistentSize;
}
public long getQueueID() {
@ -47,21 +50,30 @@ public class PageCountRecordInc implements EncodingSupport {
return value;
}
public long getPersistentSize() {
return persistentSize;
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
return 2 * DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeInt(value);
buffer.writeLong(persistentSize);
}
@Override
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
value = buffer.readInt();
if (buffer.readableBytes() > 0) {
persistentSize = buffer.readLong();
}
}
}

View File

@ -467,7 +467,7 @@ public class NullStorageManager implements StorageManager {
}
@Override
public long storePageCounter(final long txID, final long queueID, final long value) throws Exception {
public long storePageCounter(final long txID, final long queueID, final long value, final long size) throws Exception {
return 0;
}
@ -489,12 +489,12 @@ public class NullStorageManager implements StorageManager {
}
@Override
public long storePageCounterInc(final long txID, final long queueID, final int add) throws Exception {
public long storePageCounterInc(final long txID, final long queueID, final int add, final long size) throws Exception {
return 0;
}
@Override
public long storePageCounterInc(final long queueID, final int add) throws Exception {
public long storePageCounterInc(final long queueID, final int add, final long size) throws Exception {
return 0;
}

View File

@ -1480,7 +1480,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public void afterPrepare(final Transaction tx) {
for (MessageReference ref : refs) {
if (ref.isAlreadyAcked()) {
ref.getQueue().referenceHandled();
ref.getQueue().referenceHandled(ref);
ref.getQueue().incrementMesssagesAdded();
}
}

View File

@ -1911,4 +1911,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224088, value = "Timeout ({0} seconds) while handshaking has occurred.", format = Message.Format.MESSAGE_FORMAT)
void handshakeTimeout(int timeout);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224089, value = "Failed to calculate persistent size", format = Message.Format.MESSAGE_FORMAT)
void errorCalculatePersistentSize(@Cause Throwable e);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@ -99,4 +100,14 @@ public interface MessageReference {
void setAlreadyAcked();
boolean isAlreadyAcked();
/**
* This is the size of the message when persisted on disk which is used for metrics tracking
* Note that even if the message itself is not persisted on disk (ie non-durable) this value is
* still used for metrics tracking for the amount of data on a queue
*
* @return
* @throws ActiveMQException
*/
long getPersistentSize() throws ActiveMQException;
}

View File

@ -138,12 +138,42 @@ public interface Queue extends Bindable,CriticalComponent {
long getMessageCount();
/**
* This is the size of the messages in the queue when persisted on disk which is used for metrics tracking
* to give an idea of the amount of data on the queue to be consumed
*
* Note that this includes all messages on the queue, even messages that are non-durable which may only be in memory
*/
long getPersistentSize();
/**
* This is the number of the durable messages in the queue
*/
long getDurableMessageCount();
/**
* This is the persistent size of all the durable messages in the queue
*/
long getDurablePersistentSize();
int getDeliveringCount();
void referenceHandled();
long getDeliveringSize();
int getDurableDeliveringCount();
long getDurableDeliveringSize();
void referenceHandled(MessageReference ref);
int getScheduledCount();
long getScheduledSize();
int getDurableScheduledCount();
long getDurableScheduledSize();
List<MessageReference> getScheduledMessages();
/**
@ -314,8 +344,6 @@ public interface Queue extends Bindable,CriticalComponent {
*/
SimpleString getUser();
void decDelivering(int size);
/** This is to perform a check on the counter again */
void recheckRefCount(OperationContext context);

View File

@ -27,6 +27,12 @@ public interface ScheduledDeliveryHandler {
int getScheduledCount();
long getScheduledSize();
int getDurableScheduledCount();
long getDurableScheduledSize();
List<MessageReference> getScheduledReferences();
List<MessageReference> cancel(Filter filter) throws ActiveMQException;

View File

@ -722,7 +722,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
refs.remove(message.getMessageID());
// The delivering count should also be decreased as to avoid inconsistencies
((QueueImpl) ref.getQueue()).decDelivering();
((QueueImpl) ref.getQueue()).decDelivering(ref);
}
connectionFailed(e, false);

View File

@ -20,6 +20,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -116,7 +117,7 @@ public class LastValueQueue extends QueueImpl {
} else {
// We keep the current ref and ack the one we are returning
super.referenceHandled();
super.referenceHandled(ref);
try {
super.acknowledge(ref);
@ -139,7 +140,7 @@ public class LastValueQueue extends QueueImpl {
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
MessageReference oldRef = hr.getReference();
referenceHandled();
referenceHandled(ref);
try {
oldRef.acknowledge();
@ -323,6 +324,11 @@ public class LastValueQueue extends QueueImpl {
public Long getConsumerId() {
return this.consumerId;
}
@Override
public long getPersistentSize() throws ActiveMQException {
return ref.getPersistentSize();
}
}
@Override

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -158,7 +159,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
@Override
public void handled() {
queue.referenceHandled();
queue.referenceHandled(this);
}
@Override
@ -239,4 +240,9 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
public int hashCode() {
return this.getMessage().hashCode();
}
@Override
public long getPersistentSize() throws ActiveMQException {
return this.getMessage().getPersistentSize();
}
}

View File

@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@ -364,16 +365,25 @@ public class PostOfficeJournalLoader implements JournalLoader {
List<PagedMessage> pgMessages = pg.read(storageManager);
Map<Long, AtomicInteger> countsPerQueueOnPage = new HashMap<>();
Map<Long, AtomicLong> sizePerQueueOnPage = new HashMap<>();
for (PagedMessage pgd : pgMessages) {
if (pgd.getTransactionID() <= 0) {
for (long q : pgd.getQueueIDs()) {
AtomicInteger countQ = countsPerQueueOnPage.get(q);
AtomicLong sizeQ = sizePerQueueOnPage.get(q);
if (countQ == null) {
countQ = new AtomicInteger(0);
countsPerQueueOnPage.put(q, countQ);
}
if (sizeQ == null) {
sizeQ = new AtomicLong(0);
sizePerQueueOnPage.put(q, sizeQ);
}
countQ.incrementAndGet();
if (pgd.getPersistentSize() > 0) {
sizeQ.addAndGet(pgd.getPersistentSize());
}
}
}
}
@ -387,12 +397,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
PageSubscriptionCounter counter = store.getCursorProvider().getSubscription(entry.getKey()).getCounter();
AtomicInteger value = countsPerQueueOnPage.get(entry.getKey());
AtomicLong sizeValue = sizePerQueueOnPage.get(entry.getKey());
if (value == null) {
logger.debug("Page " + entry.getKey() + " wasn't open, so we will just ignore");
} else {
logger.debug("Replacing counter " + value.get());
counter.increment(txRecoverCounter, value.get());
counter.increment(txRecoverCounter, value.get(), sizeValue.get());
}
}
} else {

View File

@ -171,6 +171,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
private final QueuePendingMessageMetrics pendingMetrics = new QueuePendingMessageMetrics(this);
private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
// used to control if we should recalculate certain positions inside deliverAsync
private volatile boolean consumersChanged = true;
@ -186,8 +190,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private AtomicLong messagesKilled = new AtomicLong(0);
protected final AtomicInteger deliveringCount = new AtomicInteger(0);
private boolean paused;
private long pauseStatusRecord = -1;
@ -452,7 +454,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.server = server;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
if (addressSettingsRepository != null) {
addressSettingsRepositoryListener = new AddressSettingsRepositoryListener();
@ -1118,17 +1120,67 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
} else {
return messageReferences.size() + getScheduledCount() + deliveringCount.get();
return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount();
}
}
@Override
public long getPersistentSize() {
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize() + pageSubscription.getPersistentSize();
} else {
return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize();
}
}
@Override
public long getDurableMessageCount() {
if (isDurable()) {
if (pageSubscription != null) {
return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount() + pageSubscription.getMessageCount();
} else {
return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount();
}
}
return 0;
}
@Override
public long getDurablePersistentSize() {
if (isDurable()) {
if (pageSubscription != null) {
return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize() + pageSubscription.getPersistentSize();
} else {
return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize();
}
}
return 0;
}
@Override
public synchronized int getScheduledCount() {
return scheduledDeliveryHandler.getScheduledCount();
}
@Override
public synchronized long getScheduledSize() {
return scheduledDeliveryHandler.getScheduledSize();
}
@Override
public synchronized int getDurableScheduledCount() {
return scheduledDeliveryHandler.getDurableScheduledCount();
}
@Override
public synchronized long getDurableScheduledSize() {
return scheduledDeliveryHandler.getDurableScheduledSize();
}
@Override
public synchronized List<MessageReference> getScheduledMessages() {
return scheduledDeliveryHandler.getScheduledReferences();
@ -1153,7 +1205,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public int getDeliveringCount() {
return deliveringCount.get();
return deliveringMetrics.getMessageCount();
}
@Override
public long getDeliveringSize() {
return deliveringMetrics.getPersistentSize();
}
@Override
public int getDurableDeliveringCount() {
return deliveringMetrics.getDurableMessageCount();
}
@Override
public long getDurableDeliveringSize() {
return deliveringMetrics.getDurablePersistentSize();
}
@Override
@ -1239,7 +1306,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
getRefsOperation(tx).addAck(ref);
// https://issues.jboss.org/browse/HORNETQ-609
incDelivering();
incDelivering(ref);
messagesAcknowledged.incrementAndGet();
}
@ -1287,7 +1354,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
resetAllIterators();
} else {
decDelivering();
decDelivering(reference);
}
}
@ -1354,8 +1421,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public void referenceHandled() {
incDelivering();
public void referenceHandled(MessageReference ref) {
incDelivering(ref);
}
@Override
@ -1419,7 +1486,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return iterQueue(flushLimit, filter1, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering();
incDelivering(ref);
acknowledge(tx, ref, ackReason);
refRemoved(ref);
}
@ -1539,7 +1606,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering();
incDelivering(ref);
acknowledge(tx, ref);
iter.remove();
refRemoved(ref);
@ -1618,7 +1685,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering();
incDelivering(ref);
expire(ref);
iter.remove();
refRemoved(ref);
@ -1644,7 +1711,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering();
incDelivering(ref);
expire(tx, ref);
iter.remove();
refRemoved(ref);
@ -1711,7 +1778,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (tx == null) {
tx = new TransactionImpl(storageManager);
}
incDelivering();
incDelivering(ref);
expired = true;
expire(tx, ref);
iter.remove();
@ -1763,7 +1830,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering();
incDelivering(ref);
sendToDeadLetterAddress(null, ref);
iter.remove();
refRemoved(ref);
@ -1782,7 +1849,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering();
incDelivering(ref);
sendToDeadLetterAddress(null, ref);
iter.remove();
refRemoved(ref);
@ -1804,11 +1871,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (ref.getMessage().getMessageID() == messageID) {
iter.remove();
refRemoved(ref);
incDelivering();
incDelivering(ref);
try {
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL);
} catch (Exception e) {
decDelivering();
decDelivering(ref);
throw e;
}
return true;
@ -1836,7 +1903,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
boolean ignored = false;
incDelivering();
incDelivering(ref);
if (rejectDuplicates) {
byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
@ -1881,7 +1948,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (originalMessageAddress != null) {
incDelivering();
incDelivering(ref);
Long targetQueue = null;
if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
@ -2065,6 +2132,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private synchronized void internalAddTail(final MessageReference ref) {
refAdded(ref);
messageReferences.addTail(ref, getPriority(ref));
pendingMetrics.incrementMetrics(ref);
}
/**
@ -2076,6 +2144,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/
private void internalAddHead(final MessageReference ref) {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
pendingMetrics.incrementMetrics(ref);
refAdded(ref);
int priority = getPriority(ref);
@ -2330,6 +2399,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
protected void refRemoved(MessageReference ref) {
queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
pendingMetrics.decrementMetrics(ref);
if (ref.isPaged()) {
pagedReferences.decrementAndGet();
}
@ -2379,6 +2449,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
addTail(reference, false);
pageIterator.remove();
//We have to increment this here instead of in the iterator so we have access to the reference from next()
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
}
if (logger.isDebugEnabled()) {
@ -2387,7 +2460,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (logger.isDebugEnabled()) {
logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringCount.get());
logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount());
}
}
@ -2466,7 +2539,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
decDelivering();
decDelivering(reference);
return true;
}
@ -2890,7 +2963,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void postAcknowledge(final MessageReference ref) {
QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering();
queue.decDelivering(ref);
if (ref.isPaged()) {
// nothing to be done
@ -2958,7 +3031,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
try {
Transaction transaction = new TransactionImpl(storageManager);
for (MessageReference reference : refs) {
incDelivering(); // post ack will decrement this, so need to inc
incDelivering(reference); // post ack will decrement this, so need to inc
acknowledge(transaction, reference, AckReason.KILLED);
}
transaction.commit();
@ -3264,17 +3337,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private int incDelivering() {
return deliveringCount.incrementAndGet();
private void incDelivering(MessageReference ref) {
deliveringMetrics.incrementMetrics(ref);
}
public void decDelivering() {
deliveringCount.decrementAndGet();
public void decDelivering(final MessageReference reference) {
deliveringMetrics.decrementMetrics(reference);
}
@Override
public void decDelivering(int size) {
deliveringCount.addAndGet(-size);
private long getPersistentSize(final MessageReference reference) {
long size = 0;
try {
size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0;
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
}
return size;
}
private void configureExpiry(final AddressSettings settings) {

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import com.google.common.base.Preconditions;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
public class QueuePendingMessageMetrics {
private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "messageCount");
private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> DURABLE_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durableMessageCount");
private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "persistentSize");
private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> DURABLE_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durablePersistentSize");
private volatile int messageCount;
private volatile long persistentSize;
private volatile int durableMessageCount;
private volatile long durablePersistentSize;
private final Queue queue;
public QueuePendingMessageMetrics(final Queue queue) {
Preconditions.checkNotNull(queue);
this.queue = queue;
}
public void incrementMetrics(final MessageReference reference) {
long size = getPersistentSize(reference);
COUNT_UPDATER.incrementAndGet(this);
SIZE_UPDATER.addAndGet(this, size);
if (queue.isDurable() && reference.getMessage().isDurable()) {
DURABLE_COUNT_UPDATER.incrementAndGet(this);
DURABLE_SIZE_UPDATER.addAndGet(this, size);
}
}
public void decrementMetrics(final MessageReference reference) {
long size = -getPersistentSize(reference);
COUNT_UPDATER.decrementAndGet(this);
SIZE_UPDATER.addAndGet(this, size);
if (queue.isDurable() && reference.getMessage().isDurable()) {
DURABLE_COUNT_UPDATER.decrementAndGet(this);
DURABLE_SIZE_UPDATER.addAndGet(this, size);
}
}
/**
* @return the messageCount
*/
public int getMessageCount() {
return messageCount;
}
/**
* @param messageCount the messageCount to set
*/
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
/**
* @return the persistentSize
*/
public long getPersistentSize() {
return persistentSize;
}
/**
* @param persistentSize the persistentSize to set
*/
public void setPersistentSize(long persistentSize) {
this.persistentSize = persistentSize;
}
/**
* @return the durableMessageCount
*/
public int getDurableMessageCount() {
return durableMessageCount;
}
/**
* @param durableMessageCount the durableMessageCount to set
*/
public void setDurableMessageCount(int durableMessageCount) {
this.durableMessageCount = durableMessageCount;
}
/**
* @return the durablePersistentSize
*/
public long getDurablePersistentSize() {
return durablePersistentSize;
}
/**
* @param durablePersistentSize the durablePersistentSize to set
*/
public void setDurablePersistentSize(long durablePersistentSize) {
this.durablePersistentSize = durablePersistentSize;
}
private long getPersistentSize(final MessageReference reference) {
long size = 0;
try {
size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0;
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
}
return size;
}
}

View File

@ -50,8 +50,12 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
// just adding some information to keep it in order accordingly to the initial operations
private final TreeSet<RefScheduled> scheduledReferences = new TreeSet<>(new MessageReferenceComparator());
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor) {
private final QueuePendingMessageMetrics metrics;
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor,
final Queue queue) {
this.scheduledExecutor = scheduledExecutor;
this.metrics = new QueuePendingMessageMetrics(queue);
}
@Override
@ -76,13 +80,27 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
synchronized (scheduledReferences) {
scheduledReferences.add(new RefScheduled(ref, tail));
}
metrics.incrementMetrics(ref);
}
@Override
public int getScheduledCount() {
synchronized (scheduledReferences) {
return scheduledReferences.size();
}
return metrics.getMessageCount();
}
@Override
public int getDurableScheduledCount() {
return metrics.getDurableMessageCount();
}
@Override
public long getScheduledSize() {
return metrics.getPersistentSize();
}
@Override
public long getDurableScheduledSize() {
return metrics.getDurablePersistentSize();
}
@Override
@ -109,6 +127,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
if (filter == null || filter.match(ref.getMessage())) {
iter.remove();
refs.add(ref);
metrics.decrementMetrics(ref);
}
}
}
@ -123,6 +142,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
MessageReference ref = iter.next().getRef();
if (ref.getMessage().getMessageID() == id) {
iter.remove();
metrics.decrementMetrics(ref);
return ref;
}
}
@ -205,6 +225,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
}
iter.remove();
metrics.decrementMetrics(reference);
reference.setScheduledDeliveryTime(0);

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
@ -63,7 +64,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Test
public void testScheduleRandom() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
long nextMessage = 0;
long NUMBER_OF_SEQUENCES = 100000;
@ -88,7 +89,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Test
public void testScheduleSameTimeHeadAndTail() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
long time = System.currentTimeMillis() + 10000;
for (int i = 10001; i < 20000; i++) {
@ -110,7 +111,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Test
public void testScheduleFixedSample() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
addMessage(handler, 0, 48L, true);
addMessage(handler, 1, 75L, true);
@ -124,7 +125,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Test
public void testScheduleWithAddHeads() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
addMessage(handler, 0, 1, true);
addMessage(handler, 1, 2, true);
@ -145,7 +146,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Test
public void testScheduleFixedSampleTailAndHead() throws Exception {
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
// mix a sequence of tails / heads, but at the end this was supposed to be all sequential
addMessage(handler, 1, 48L, true);
@ -191,8 +192,9 @@ public class ScheduledDeliveryHandlerTest extends Assert {
private void internalSchedule(ExecutorService executor, ScheduledThreadPoolExecutor scheduler) throws Exception {
final int NUMBER_OF_MESSAGES = 200;
int NUMBER_OF_THREADS = 20;
final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler);
final FakeQueueForScheduleUnitTest fakeQueue = new FakeQueueForScheduleUnitTest(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler, fakeQueue);
final long now = System.currentTimeMillis();
@ -776,6 +778,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public void sendBuffer(ByteBuf buffer, int count) {
}
@Override
public long getPersistentSize() throws ActiveMQException {
return 0;
}
}
public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
@ -1016,13 +1023,53 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0;
}
@Override
public long getPersistentSize() {
return 0;
}
@Override
public long getDurableMessageCount() {
return 0;
}
@Override
public long getDurablePersistentSize() {
return 0;
}
@Override
public int getDeliveringCount() {
return 0;
}
@Override
public void referenceHandled() {
public long getDeliveringSize() {
return 0;
}
@Override
public int getDurableDeliveringCount() {
return 0;
}
@Override
public long getDurableDeliveringSize() {
return 0;
}
@Override
public int getDurableScheduledCount() {
return 0;
}
@Override
public long getDurableScheduledSize() {
return 0;
}
@Override
public void referenceHandled(MessageReference ref) {
}
@ -1031,6 +1078,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0;
}
@Override
public long getScheduledSize() {
return 0;
}
@Override
public List<MessageReference> getScheduledMessages() {
return null;
@ -1310,7 +1362,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public SimpleString getUser() {
return null;
}
@Override
public boolean isLastValue() {
return false;
@ -1326,13 +1377,5 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void decDelivering(int size) {
}
}
}

View File

@ -587,7 +587,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public long storePageCounter(long txID, long queueID, long value) throws Exception {
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return 0;
}
@ -612,12 +612,12 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception {
return 0;
}
@Override
public long storePageCounterInc(long queueID, int add) throws Exception {
public long storePageCounterInc(long queueID, int add, long size) throws Exception {
return 0;
}

View File

@ -0,0 +1,23 @@
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.server.Queue
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
queue.getPageSubscription().getPagingStore().startPaging();

View File

@ -0,0 +1,23 @@
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.server.Queue
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
GroovyRun.assertTrue(queue.getPageSubscription().getPagingStore().isPaging())

View File

@ -0,0 +1,37 @@
package metrics
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
//validate metrics are recovered
Object[] queueControls = server.getJMSServerManager().getActiveMQServer().getManagementService().getResources(QueueControl.class);
for (Object o : queueControls) {
QueueControl c = (QueueControl) o;
GroovyRun.assertTrue(c.getPersistentSize() > 0);
GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
GroovyRun.assertEquals(16l, c.getMessageCount());
GroovyRun.assertEquals(16l, c.getDurableMessageCount());
}

View File

@ -19,7 +19,8 @@ package servers
// starts an artemis server
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.JournalType
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
@ -31,8 +32,10 @@ String id = arg[1];
String type = arg[2];
String producer = arg[3];
String consumer = arg[4];
String globalMaxSize = arg[5];
println("type = " + type);
println("globalMaxSize = " + globalMaxSize);
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
@ -44,6 +47,10 @@ configuration.setPersistenceEnabled(persistent);
try {
if (!type.startsWith("ARTEMIS-1")) {
configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true));
if (globalMaxSize != null) {
configuration.getAddressesSettings().get("#").setPageSizeBytes(globalMaxSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE)
configuration.setGlobalMaxSize(Long.parseLong(globalMaxSize));
}
}
} catch (Throwable e) {
// need to ignore this for 1.4

View File

@ -17,6 +17,9 @@
package org.apache.activemq.artemis.tests.compatibility;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -28,9 +31,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
*
@ -105,5 +105,45 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
}
/**
* Test that the server starts properly using an old journal even though persistent size
* metrics were not originaly stored
*/
@Test
public void testSendReceiveQueueMetrics() throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest");
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest");
setVariable(receiverClassloader, "latch", null);
evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
}
/**
* Test that the metrics are recovered when paging. Even though the paging counts won't
* be persisted the journal the server should still start properly. The persistent sizes
* will be recovered when the messages are depaged
*/
@Test
public void testSendReceiveSizeQueueMetricsPaging() throws Throwable {
setVariable(senderClassloader, "persistent", true);
//Set max size to 1 to cause messages to immediately go to the paging store
startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1));
evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1));
evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");
evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
}
}

View File

@ -189,6 +189,10 @@ public abstract class VersionedBaseTest {
}
public void startServer(File folder, ClassLoader loader, String serverName) throws Throwable {
startServer(folder, loader, serverName, null);
}
public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize) throws Throwable {
folder.mkdirs();
System.out.println("Folder::" + folder);
@ -202,9 +206,8 @@ public abstract class VersionedBaseTest {
scriptToUse = "servers/hornetqServer.groovy";
}
evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver);
evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize);
}
public void stopServer(ClassLoader loader) throws Throwable {
execute(loader, "server.stop()");
}

View File

@ -817,5 +817,10 @@ public class AcknowledgeTest extends ActiveMQTestBase {
public Map<String, Object> toPropertyMap() {
return null;
}
@Override
public long getPersistentSize() throws ActiveMQException {
return 0;
}
}
}

View File

@ -668,8 +668,8 @@ public class SendAckFailTest extends ActiveMQTestBase {
}
@Override
public long storePageCounter(long txID, long queueID, long value) throws Exception {
return manager.storePageCounter(txID, queueID, value);
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return manager.storePageCounter(txID, queueID, value, size);
}
@Override
@ -693,13 +693,13 @@ public class SendAckFailTest extends ActiveMQTestBase {
}
@Override
public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
return manager.storePageCounterInc(txID, queueID, add);
public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception {
return manager.storePageCounterInc(txID, queueID, add, size);
}
@Override
public long storePageCounterInc(long queueID, int add) throws Exception {
return manager.storePageCounterInc(queueID, add);
public long storePageCounterInc(long queueID, int add, long size) throws Exception {
return manager.storePageCounterInc(queueID, add, size);
}
@Override

View File

@ -122,6 +122,21 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
return control.getMessageCount();
}
protected long getDurableMessageCount(QueueControl control) throws Exception {
control.flushExecutor();
return control.getDurableMessageCount();
}
protected long getMessageSize(QueueControl control) throws Exception {
control.flushExecutor();
return control.getPersistentSize();
}
protected long getDurableMessageSize(QueueControl control) throws Exception {
control.flushExecutor();
return control.getDurablePersistentSize();
}
protected long getMessagesAdded(QueueControl control) throws Exception {
control.flushExecutor();
return control.getMessagesAdded();

View File

@ -16,16 +16,24 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import javax.management.openmbean.CompositeData;
import java.util.HashMap;
import java.util.Map;
import javax.management.openmbean.CompositeData;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class QueueControlUsingCoreTest extends QueueControlTest {
public QueueControlUsingCoreTest(boolean durable) {
super(durable);
}
@Override
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue) throws Exception {
@ -116,6 +124,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Integer) proxy.retrieveAttributeValue("deliveringCount", Integer.class);
}
@Override
public long getDeliveringSize() {
return (Long) proxy.retrieveAttributeValue("deliveringSize", Long.class);
}
@Override
public int getDurableDeliveringCount() {
return (Integer) proxy.retrieveAttributeValue("durableDeliveringCount", Integer.class);
}
@Override
public long getDurableDeliveringSize() {
return (Long) proxy.retrieveAttributeValue("durableDeliveringSize", Long.class);
}
@Override
public String getExpiryAddress() {
return (String) proxy.retrieveAttributeValue("expiryAddress");
@ -186,6 +209,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Long) proxy.retrieveAttributeValue("scheduledCount", Long.class);
}
@Override
public long getScheduledSize() {
return (Long) proxy.retrieveAttributeValue("scheduledSize", Long.class);
}
@Override
public long getDurableScheduledCount() {
return (Long) proxy.retrieveAttributeValue("durableScheduledCount", Long.class);
}
@Override
public long getDurableScheduledSize() {
return (Long) proxy.retrieveAttributeValue("durableScheduledSize", Long.class);
}
@Override
public boolean isDurable() {
return (Boolean) proxy.retrieveAttributeValue("durable");
@ -455,6 +493,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
public String listDeliveringMessagesAsJSON() throws Exception {
return (String) proxy.invokeOperation("listDeliveringMessagesAsJSON");
}
@Override
public long getPersistentSize() {
return (Long) proxy.retrieveAttributeValue("persistentSize", Long.class);
}
@Override
public long getDurableMessageCount() {
return (Long) proxy.retrieveAttributeValue("durableMessageCount", Long.class);
}
@Override
public long getDurablePersistentSize() {
return (Long) proxy.retrieveAttributeValue("durablePersistentSize", Long.class);
}
};
}
}

View File

@ -89,15 +89,17 @@ public class PagingCounterTest extends ActiveMQTestBase {
Transaction tx = new TransactionImpl(server.getStorageManager());
counter.increment(tx, 1);
counter.increment(tx, 1, 1000);
assertEquals(0, counter.getValue());
assertEquals(0, counter.getPersistentSize());
tx.commit();
storage.waitOnOperations();
assertEquals(1, counter.getValue());
assertEquals(1000, counter.getPersistentSize());
} finally {
sf.close();
session.close();
@ -121,7 +123,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
for (int i = 0; i < 2100; i++) {
counter.increment(tx, 1);
counter.increment(tx, 1, 1000);
if (i % 200 == 0) {
tx.commit();
@ -129,6 +131,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations();
assertEquals(i + 1, counter.getValue());
assertEquals((i + 1) * 1000, counter.getPersistentSize());
tx = new TransactionImpl(server.getStorageManager());
}
@ -139,6 +142,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations();
assertEquals(2100, counter.getValue());
assertEquals(2100 * 1000, counter.getPersistentSize());
server.stop();
@ -153,6 +157,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter = locateCounter(queue);
assertEquals(2100, counter.getValue());
assertEquals(2100 * 1000, counter.getPersistentSize());
} finally {
sf.close();
@ -180,7 +185,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
for (int i = 0; i < 2100; i++) {
counter.increment(tx, 1);
counter.increment(tx, 1, 1000);
if (i % 200 == 0) {
tx.commit();
@ -188,6 +193,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations();
assertEquals(i + 1, counter.getValue());
assertEquals((i + 1) * 1000, counter.getPersistentSize());
tx = new TransactionImpl(server.getStorageManager());
}
@ -198,6 +204,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations();
assertEquals(2100, counter.getValue());
assertEquals(2100 * 1000, counter.getPersistentSize());
server.stop();
@ -212,6 +219,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter = locateCounter(queue);
assertEquals(0, counter.getValue());
assertEquals(0, counter.getPersistentSize());
} finally {
sf.close();
@ -230,15 +238,17 @@ public class PagingCounterTest extends ActiveMQTestBase {
Transaction tx = new TransactionImpl(server.getStorageManager());
counter.increment(tx, 1);
counter.increment(tx, 1, 1000);
assertEquals(0, counter.getValue());
assertEquals(0, counter.getPersistentSize());
tx.commit();
storage.waitOnOperations();
assertEquals(1, counter.getValue());
assertEquals(1000, counter.getPersistentSize());
sl.close();
@ -255,6 +265,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter = locateCounter(queue);
assertEquals(1, counter.getValue());
assertEquals(1000, counter.getPersistentSize());
}
@ -283,7 +294,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
Transaction tx = new TransactionImpl(xid, server.getStorageManager(), 300);
for (int i = 0; i < 2000; i++) {
counter.increment(tx, 1);
counter.increment(tx, 1, 1000);
}
assertEquals(0, counter.getValue());

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence.metrics;
import java.util.Enumeration;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public abstract class AbstractPersistentStatTestSupport extends JMSTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractPersistentStatTestSupport.class);
protected static int defaultMessageSize = 1000;
@Override
protected boolean usePersistence() {
return true;
}
protected void consumeTestQueueMessages(String queueName, int num) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer;
try {
consumer = session.createConsumer(queue);
for (int i = 0; i < num; i++) {
consumer.receive();
}
consumer.close();
} finally {
// consumer.close();
connection.close();
}
}
protected void browseTestQueueMessages(String queueName) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
QueueBrowser queueBrowser = session.createBrowser(queue);
@SuppressWarnings("unchecked")
Enumeration<Message> messages = queueBrowser.getEnumeration();
while (messages.hasMoreElements()) {
messages.nextElement();
}
} finally {
connection.close();
}
}
protected void consumeDurableTestMessages(Connection connection, String sub, int size, String topicName,
AtomicLong publishedMessageSize) throws Exception {
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
try {
TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
for (int i = 0; i < size; i++) {
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
if (publishedMessageSize != null) {
publishedMessageSize.addAndGet(-message.getCoreMessage().getEncodeSize());
}
}
} finally {
session.close();
}
}
protected void publishTestQueueMessages(int count, String queueName, int deliveryMode, int messageSize,
AtomicLong publishedMessageSize, boolean transacted) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId" + queueName);
connection.start();
Session session = transacted ? connection.createSession(transacted, QueueSession.SESSION_TRANSACTED) :
connection.createSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
MessageProducer prod = session.createProducer(queue);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < count; i++) {
prod.send(createMessage(i, session, messageSize, publishedMessageSize));
}
if (transacted) {
session.commit();
}
} finally {
connection.close();
}
}
protected void publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, boolean verifyBrowsing,
boolean shared)
throws Exception {
this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT, shared);
}
protected void publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, boolean verifyBrowsing,
int deliveryMode, boolean shared) throws Exception {
Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
for (String subName : subNames) {
if (shared) {
session.createSharedDurableConsumer(topic, subName);
} else {
session.createDurableSubscriber(topic, subName);
}
}
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(i, session, messageSize, publishedMessageSize));
}
} finally {
session.close();
}
}
/**
* Generate random messages between 100 bytes and maxMessageSize
*
* @param session
* @return
* @throws JMSException
* @throws ActiveMQException
*/
protected BytesMessage createMessage(int count, Session session, int maxMessageSize, AtomicLong publishedMessageSize)
throws JMSException, ActiveMQException {
final ActiveMQBytesMessage message = (ActiveMQBytesMessage) session.createBytesMessage();
final Random randomSize = new Random();
int size = randomSize.nextInt((maxMessageSize - 100) + 1) + 100;
final byte[] data = new byte[size];
final Random rng = new Random();
rng.nextBytes(data);
message.writeBytes(data);
if (publishedMessageSize != null) {
publishedMessageSize.addAndGet(message.getCoreMessage().getPersistentSize());
}
return message;
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence.metrics;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JournalPageCountSizeTest extends ActiveMQTestBase {
private ActiveMQServer server;
@Before
public void init() throws Exception {
server = createServer(true);
server.start();
}
@Override
protected ConfigurationImpl createBasicConfig(int serverID) {
return super.createBasicConfig(serverID);
}
@After
public void destroy() throws Exception {
server.stop();
}
@Test
public void testPageCountRecordSize() throws Exception {
long tx = server.getStorageManager().generateID();
server.getStorageManager().storePageCounter(tx, 1, 1, 100);
server.getStorageManager().commit(tx);
server.getStorageManager().stop();
JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
List<RecordInfo> committedRecords = new LinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
try {
journalStorageManager.getMessageJournal().start();
journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
PageCountRecord encoding = new PageCountRecord();
encoding.decode(buff);
Assert.assertEquals(100, encoding.getPersistentSize());
} finally {
journalStorageManager.getMessageJournal().stop();
}
}
@Test
public void testPageCursorCounterRecordSize() throws Exception {
server.getStorageManager().storePageCounterInc(1, 1, 1000);
server.getStorageManager().stop();
JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
List<RecordInfo> committedRecords = new LinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
try {
journalStorageManager.getMessageJournal().start();
journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
PageCountRecordInc encoding = new PageCountRecordInc();
encoding.decode(buff);
Assert.assertEquals(1000, encoding.getPersistentSize());
} finally {
journalStorageManager.getMessageJournal().stop();
}
}
@Test
public void testPageCursorCounterRecordSizeTX() throws Exception {
long tx = server.getStorageManager().generateID();
server.getStorageManager().storePageCounterInc(tx, 1, 1, 1000);
server.getStorageManager().commit(tx);
server.getStorageManager().stop();
JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
List<RecordInfo> committedRecords = new LinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
try {
journalStorageManager.getMessageJournal().start();
journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
PageCountRecordInc encoding = new PageCountRecordInc();
encoding.decode(buff);
Assert.assertEquals(1000, encoding.getPersistentSize());
} finally {
journalStorageManager.getMessageJournal().stop();
}
}
private TransactionFailureCallback transactionFailure = new TransactionFailureCallback() {
@Override
public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete) {
}
};
}

View File

@ -0,0 +1,651 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence.metrics;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.ToLongFunction;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.junit.Wait.Condition;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.commons.lang.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JournalPendingMessageTest.class);
// protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic";
protected static int maxMessageSize = 1000;
@Before
public void setupAddresses() throws Exception {
server.getPostOffice()
.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(defaultQueueName), RoutingType.ANYCAST));
server.createQueue(SimpleString.toSimpleString(defaultQueueName), RoutingType.ANYCAST,
SimpleString.toSimpleString(defaultQueueName), null, true, false);
}
@Override
protected Configuration createDefaultConfig(boolean netty) throws Exception {
Configuration config = super.createDefaultConfig(netty);
// Set a low max size so we page which will test the paging metrics as
// well
config.setGlobalMaxSize(100000);
return config;
}
@Test
public void testQueueMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
publishTestQueueMessages(200, publishedMessageSize);
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
this.killServer();
this.restartServer();
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
}
@Test
public void testQueueMessageSizeTx() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
publishTestQueueMessagesTx(200, publishedMessageSize);
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
this.killServer();
this.restartServer();
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
}
@Test
public void testQueueLargeMessageSize() throws Exception {
ActiveMQConnectionFactory acf = (ActiveMQConnectionFactory) cf;
acf.setMinLargeMessageSize(1000);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String testText = StringUtils.repeat("t", 5000);
ActiveMQTextMessage message = (ActiveMQTextMessage) session.createTextMessage(testText);
session.createProducer(session.createQueue(defaultQueueName)).send(message);
verifyPendingStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
verifyPendingDurableStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
connection.close();
this.killServer();
this.restartServer();
verifyPendingStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
verifyPendingDurableStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
}
@Test
public void testQueueLargeMessageSizeTX() throws Exception {
ActiveMQConnectionFactory acf = (ActiveMQConnectionFactory) cf;
acf.setMinLargeMessageSize(1000);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
String testText = StringUtils.repeat("t", 2000);
MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
ActiveMQTextMessage message = (ActiveMQTextMessage) session.createTextMessage(testText);
for (int i = 0; i < 10; i++) {
producer.send(message);
}
//not commited so should be 0
verifyPendingStats(defaultQueueName, 0, message.getCoreMessage().getPersistentSize() * 10);
verifyPendingDurableStats(defaultQueueName, 0, message.getCoreMessage().getPersistentSize() * 10);
session.commit();
verifyPendingStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize() * 10);
verifyPendingDurableStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize() * 10);
connection.close();
this.killServer();
this.restartServer();
verifyPendingStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize());
verifyPendingDurableStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize());
}
@Test
public void testQueueBrowserMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
publishTestQueueMessages(200, publishedMessageSize);
browseTestQueueMessages(defaultQueueName);
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
}
@Test
public void testQueueMessageSizeNonPersistent() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 0, 0);
}
@Test
public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
AtomicLong publishedMessageSize = new AtomicLong();
publishTestQueueMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
publishTestQueueMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 100, publishedMessageSize.get());
}
@Test
public void testQueueMessageSizeAfterConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
publishTestQueueMessages(200, publishedMessageSize);
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
consumeTestQueueMessages(200);
verifyPendingStats(defaultQueueName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
}
@Test
public void testScheduledStats() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
producer.setDeliveryDelay(2000);
producer.send(session.createTextMessage("test"));
verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
verifyScheduledStats(defaultQueueName, 1, publishedMessageSize.get());
consumeTestQueueMessages(1);
verifyPendingStats(defaultQueueName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
verifyScheduledStats(defaultQueueName, 0, 0);
connection.close();
}
@Test
public void testDeliveringStats() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
producer.send(session.createTextMessage("test"));
verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
verifyDeliveringStats(defaultQueueName, 0, 0);
MessageConsumer consumer = session.createConsumer(session.createQueue(defaultQueueName));
Message msg = consumer.receive();
verifyDeliveringStats(defaultQueueName, 1, publishedMessageSize.get());
msg.acknowledge();
verifyPendingStats(defaultQueueName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
verifyDeliveringStats(defaultQueueName, 0, 0);
connection.close();
}
@Test
public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
consumeTestQueueMessages(200);
verifyPendingStats(defaultQueueName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
}
@Test
public void testTopicMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
publishTestTopicMessages(200, publishedMessageSize);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 0, 0);
// consume all messages
consumeTestMessages(consumer, 200);
// All messages should now be gone
verifyPendingStats(defaultTopicName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
connection.close();
}
@Test
public void testTopicMessageSizeShared() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createSharedConsumer(session.createTopic(defaultTopicName), "sub1");
MessageConsumer consumer2 = session.createSharedConsumer(session.createTopic(defaultTopicName), "sub1");
publishTestTopicMessages(200, publishedMessageSize);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 0, 0);
consumer2.close();
// consume all messages
consumeTestMessages(consumer, 200);
// All messages should now be gone
verifyPendingStats(defaultTopicName, 0, 0);
verifyPendingDurableStats(defaultTopicName, 0, 0);
connection.close();
}
@Test
public void testTopicNonPersistentMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
publishTestTopicMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
// consume all messages
consumeTestMessages(consumer, 200);
// All messages should now be gone
verifyPendingStats(defaultTopicName, 0, 0);
connection.close();
}
@Test
public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
publishTestTopicMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
// consume all messages
consumeTestMessages(consumer, 200);
// All messages should now be gone
verifyPendingStats(defaultTopicName, 0, 0);
connection.close();
}
@Test
public void testMessageSizeOneDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, publishedMessageSize,
DeliveryMode.PERSISTENT, false);
// verify the count and size - durable is offline so all 200 should be
// pending since none are in prefetch
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
// consume all messages
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
// All messages should now be gone
verifyPendingStats(defaultTopicName, 0, 0);
verifyPendingDurableStats(defaultTopicName, 0, 0);
connection.close();
}
@Test
public void testMessageSizeOneDurablePartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, publishedMessageSize,
DeliveryMode.PERSISTENT, false);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
// consume partial messages
consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
// 150 should be left
verifyPendingStats(defaultTopicName, 150, publishedMessageSize.get());
// We don't really know the size here but it should be smaller than before
// so take an average
verifyPendingDurableStats(defaultTopicName, 150, (long) (.75 * publishedMessageSize.get()));
connection.close();
}
@Test
public void testMessageSizeTwoDurables() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize,
DeliveryMode.PERSISTENT, false);
// verify the count and size - double because two durables so two queue
// bindings
verifyPendingStats(defaultTopicName, 400, 2 * publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 400, 2 * publishedMessageSize.get());
// consume messages just for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
// There is still a durable that hasn't consumed so the messages should
// exist
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
connection.close();
// restart and verify load
this.killServer();
this.restartServer();
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
}
@Test
public void testMessageSizeSharedDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
// The publish method will create a second shared consumer
Session s = connection.createSession();
MessageConsumer c = s.createSharedDurableConsumer(s.createTopic(defaultTopicName), "sub1");
publishTestMessagesDurable(connection, new String[] {"sub1",}, 200, publishedMessageSize,
DeliveryMode.PERSISTENT, true);
// verify the count and size - double because two durables so two queue
// bindings
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
c.close();
// consume messages for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
verifyPendingStats(defaultTopicName, 0, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 0, publishedMessageSize.get());
connection.close();
}
protected List<Queue> getQueues(final String address) throws Exception {
final List<Queue> queues = new ArrayList<>();
for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))
.getBindings()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
LocalQueueBinding queueBinding = (LocalQueueBinding) binding;
queues.add(queueBinding.getQueue());
}
}
return queues;
}
protected void verifyDeliveringStats(final String address, final int count, final long minimumSize) throws Exception {
verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDeliveringCount,
org.apache.activemq.artemis.core.server.Queue::getDeliveringSize);
verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableDeliveringCount,
org.apache.activemq.artemis.core.server.Queue::getDurableDeliveringSize);
}
protected void verifyScheduledStats(final String address, final int count, final long minimumSize) throws Exception {
verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getScheduledCount,
org.apache.activemq.artemis.core.server.Queue::getScheduledSize);
verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableScheduledCount,
org.apache.activemq.artemis.core.server.Queue::getDurableScheduledSize);
}
protected void verifyPendingStats(final String address, final int count, final long minimumSize) throws Exception {
verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getMessageCount,
org.apache.activemq.artemis.core.server.Queue::getPersistentSize);
}
protected void verifyPendingDurableStats(final String address, final int count, final long minimumSize)
throws Exception {
verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableMessageCount,
org.apache.activemq.artemis.core.server.Queue::getDurablePersistentSize);
}
protected void verifyStats(final String address, final int count, final long minimumSize,
ToLongFunction<Queue> countFunc, ToLongFunction<Queue> sizeFunc)
throws Exception {
final List<Queue> queues = getQueues(address);
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisfied() throws Exception {
return queues.stream().mapToLong(countFunc)
.sum() == count;
}
}));
verifySize(count, new MessageSizeCalculator() {
@Override
public long getMessageSize() throws Exception {
return queues.stream().mapToLong(sizeFunc)
.sum();
}
}, minimumSize);
}
protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator, final long minimumSize)
throws Exception {
if (count > 0) {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisfied() throws Exception {
return messageSizeCalculator.getMessageSize() > minimumSize;
}
}));
} else {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisfied() throws Exception {
return messageSizeCalculator.getMessageSize() == 0;
}
}));
}
}
protected interface MessageSizeCalculator {
long getMessageSize() throws Exception;
}
protected void consumeTestMessages(MessageConsumer consumer, int size) throws Exception {
consumeTestMessages(consumer, size, defaultTopicName);
}
protected void consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception {
for (int i = 0; i < size; i++) {
consumer.receive();
}
}
protected void consumeDurableTestMessages(Connection connection, String sub, int size,
AtomicLong publishedMessageSize) throws Exception {
consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
}
protected void publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize,
AtomicLong publishedMessageSize, int deliveryMode, boolean shared) throws Exception {
publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, 0,
AbstractPersistentStatTestSupport.defaultMessageSize, publishedMessageSize, false, deliveryMode, shared);
}
protected void publishTestTopicMessages(int publishSize, AtomicLong publishedMessageSize) throws Exception {
publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize);
}
protected void publishTestTopicMessages(int publishSize, int deliveryMode, AtomicLong publishedMessageSize)
throws Exception {
// create a new queue
Connection connection = cf.createConnection();
connection.setClientID("clientId2");
connection.start();
// Start the connection
Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(defaultTopicName);
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(i, session, JournalPendingMessageTest.maxMessageSize, publishedMessageSize));
}
} finally {
connection.close();
}
}
protected void publishTestQueueMessagesTx(int count, AtomicLong publishedMessageSize) throws Exception {
publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
JournalPendingMessageTest.maxMessageSize, publishedMessageSize, true);
}
protected void publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
JournalPendingMessageTest.maxMessageSize, publishedMessageSize, false);
}
protected void publishTestQueueMessages(int count, int deliveryMode, AtomicLong publishedMessageSize)
throws Exception {
publishTestQueueMessages(count, defaultQueueName, deliveryMode, JournalPendingMessageTest.maxMessageSize,
publishedMessageSize, false);
}
protected void consumeTestQueueMessages(int num) throws Exception {
consumeTestQueueMessages(defaultQueueName, num);
}
}

View File

@ -362,6 +362,21 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return messageCount;
}
@Override
public long getPersistentSize() {
return 0;
}
@Override
public long getDurableMessageCount() {
return 0;
}
@Override
public long getDurablePersistentSize() {
return 0;
}
public void setMessageCount(long messageCount) {
this.messageCount = messageCount;
}
@ -452,6 +467,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return 0;
}
@Override
public long getScheduledSize() {
// no-op
return 0;
}
@Override
public List<MessageReference> getScheduledMessages() {
// no-op
@ -522,7 +543,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void referenceHandled() {
public void referenceHandled(MessageReference ref) {
// no-op
}
@ -684,6 +705,28 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void decDelivering(int size) {
public long getDeliveringSize() {
return 0;
}
@Override
public int getDurableDeliveringCount() {
return 0;
}
@Override
public long getDurableDeliveringSize() {
return 0;
}
@Override
public int getDurableScheduledCount() {
return 0;
}
@Override
public long getDurableScheduledSize() {
return 0;
}
}

View File

@ -105,7 +105,7 @@ public class FakeConsumer implements Consumer {
if (filter != null) {
if (filter.match(reference.getMessage())) {
references.addLast(reference);
reference.getQueue().referenceHandled();
reference.getQueue().referenceHandled(reference);
notify();
return HandleStatus.HANDLED;
@ -125,7 +125,7 @@ public class FakeConsumer implements Consumer {
}
if (statusToReturn == HandleStatus.HANDLED) {
reference.getQueue().referenceHandled();
reference.getQueue().referenceHandled(reference);
references.addLast(reference);
notify();
}