ARTEMIS-4774 Fixing PageCounters out of sync after AckMnager retries
This commit is contained in:
parent
22540cc3ab
commit
e47d8ea7c1
|
@ -202,9 +202,8 @@ public class AckManager implements ActiveMQComponent {
|
|||
|
||||
|
||||
// to be used with the same executor as the PagingStore executor
|
||||
public boolean retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
|
||||
public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
|
||||
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
|
||||
boolean retriedPaging = false;
|
||||
logger.trace("retrying address {} on server {}", address, server);
|
||||
try {
|
||||
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
|
||||
|
@ -219,14 +218,12 @@ public class AckManager implements ActiveMQComponent {
|
|||
logger.trace("Retry stopped while reading page {} on address {} as the outcome is now empty, server={}", pageId, address, server);
|
||||
break;
|
||||
}
|
||||
Page page = store.usePage(pageId, true, false);
|
||||
Page page = openPage(store, pageId);
|
||||
if (page == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (retryPage(acksToRetry, page, key)) {
|
||||
retriedPaging = true;
|
||||
}
|
||||
retryPage(acksToRetry, address, page, key);
|
||||
} finally {
|
||||
page.usageDown();
|
||||
}
|
||||
|
@ -241,7 +238,17 @@ public class AckManager implements ActiveMQComponent {
|
|||
} finally {
|
||||
AMQPMirrorControllerTarget.setControllerInUse(previousController);
|
||||
}
|
||||
return retriedPaging;
|
||||
}
|
||||
|
||||
private Page openPage(PagingStore store, long pageID) throws Throwable {
|
||||
Page page = store.newPageObject(pageID);
|
||||
if (page.getFile().exists()) {
|
||||
page.getMessages();
|
||||
return page;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void validateExpiredSet(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
|
||||
|
@ -265,10 +272,11 @@ public class AckManager implements ActiveMQComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry,
|
||||
private void retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry,
|
||||
SimpleString address,
|
||||
Page page,
|
||||
AckRetry key) throws Exception {
|
||||
AtomicBoolean retriedPaging = new AtomicBoolean(false);
|
||||
logger.debug("scanning for acks on page {} on address {}", page.getPageId(), address);
|
||||
TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
|
||||
// scan each page for acks
|
||||
page.getMessages().forEach(pagedMessage -> {
|
||||
|
@ -297,8 +305,8 @@ public class AckManager implements ActiveMQComponent {
|
|||
if (!subscription.isAcked(pagedMessage)) {
|
||||
PagedReference reference = retries.getContext().getPagingStore().getCursorProvider().newReference(pagedMessage, subscription);
|
||||
try {
|
||||
subscription.ackTx(transaction, reference);
|
||||
retriedPaging.set(true);
|
||||
subscription.ackTx(transaction, reference, false);
|
||||
subscription.getQueue().postAcknowledge(reference, ackRetry.getReason(), false);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
if (ioCriticalErrorListener != null) {
|
||||
|
@ -325,8 +333,6 @@ public class AckManager implements ActiveMQComponent {
|
|||
ioCriticalErrorListener.onIOException(e, e.getMessage(), null);
|
||||
}
|
||||
}
|
||||
|
||||
return retriedPaging.get();
|
||||
}
|
||||
|
||||
/** returns true if there are retries ready to be scanned on paging */
|
||||
|
@ -430,8 +436,6 @@ public class AckManager implements ActiveMQComponent {
|
|||
|
||||
Iterator<Map.Entry<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>>> retryIterator;
|
||||
|
||||
boolean retriedPaging = false;
|
||||
|
||||
|
||||
MultiStepProgress(HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retryList) {
|
||||
this.retryList = retryList;
|
||||
|
@ -453,9 +457,7 @@ public class AckManager implements ActiveMQComponent {
|
|||
|
||||
PagingStore pagingStore = server.getPagingManager().getPageStore(entry.getKey());
|
||||
pagingStore.execute(() -> {
|
||||
if (AckManager.this.retryAddress(entry.getKey(), entry.getValue())) {
|
||||
retriedPaging = true;
|
||||
}
|
||||
AckManager.this.retryAddress(entry.getKey(), entry.getValue());
|
||||
nextStep();
|
||||
});
|
||||
}
|
||||
|
|
|
@ -177,4 +177,8 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
|
|||
default void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transactionConsumer) {
|
||||
}
|
||||
|
||||
default boolean isRebuildingCounters() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -87,10 +87,13 @@ public interface PageSubscription {
|
|||
// for internal (cursor) classes
|
||||
void confirmPosition(PagePosition ref) throws Exception;
|
||||
|
||||
void ackTx(Transaction tx, PagedReference position) throws Exception;
|
||||
void ackTx(Transaction tx, PagedReference position, boolean fromDelivery) throws Exception;
|
||||
|
||||
default void ackTx(Transaction tx, PagedReference position) throws Exception {
|
||||
ackTx(tx, position, true);
|
||||
}
|
||||
// for internal (cursor) classes
|
||||
void confirmPosition(Transaction tx, PagePosition position) throws Exception;
|
||||
void confirmPosition(Transaction tx, PagePosition position, boolean fromDelivery) throws Exception;
|
||||
|
||||
/**
|
||||
* @return the first page in use or MAX_LONG if none is in use
|
||||
|
@ -158,12 +161,6 @@ public interface PageSubscription {
|
|||
*/
|
||||
void onDeletePage(Page deletedPage) throws Exception;
|
||||
|
||||
long getDeliveredCount();
|
||||
|
||||
long getDeliveredSize();
|
||||
|
||||
void incrementDeliveredSize(long size);
|
||||
|
||||
void removePendingDelivery(PagedMessage pagedMessage);
|
||||
|
||||
ConsumedPage locatePageInfo(long pageNr);
|
||||
|
|
|
@ -44,7 +44,6 @@ import java.util.function.BiConsumer;
|
|||
|
||||
/** this class will copy current data from the Subscriptions, count messages while the server is already active
|
||||
* performing other activity */
|
||||
// TODO: Rename this as RebuildManager in a future major version
|
||||
public class PageCounterRebuildManager implements Runnable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
@ -90,7 +89,7 @@ public class PageCounterRebuildManager implements Runnable {
|
|||
Page currentPage = store.getCurrentPage();
|
||||
limitPageId = store.getCurrentWritingPage();
|
||||
limitMessageNr = currentPage.getNumberOfMessages();
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("PageCounterRebuild for {}, Current writing page {} and limit will be {} with lastMessage on last page={}", store.getStoreName(), store.getCurrentWritingPage(), limitPageId, limitMessageNr);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -139,14 +138,6 @@ public class PageCounterRebuildManager implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized PageSubscriptionCounter getCounter(long queueID) {
|
||||
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
|
||||
if (copiedSubscription != null) {
|
||||
return copiedSubscription.subscriptionCounter;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private CopiedSubscription getSubscription(long queueID) {
|
||||
return copiedSubscriptionMap.get(queueID);
|
||||
|
|
|
@ -183,10 +183,10 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter {
|
|||
recordedSizeUpdater.set(this, size);
|
||||
valueUpdater.set(this, value);
|
||||
persistentSizeUpdater.set(this, size);
|
||||
addedUpdater.set(this, size);
|
||||
addedUpdater.set(this, value);
|
||||
}
|
||||
|
||||
private void process(int add, long size) {
|
||||
private void process(final int add, final long size) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size);
|
||||
}
|
||||
|
@ -203,7 +203,7 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter {
|
|||
}
|
||||
|
||||
if (isRebuilding()) {
|
||||
recordedValueUpdater.addAndGet(this, value);
|
||||
recordedValueUpdater.addAndGet(this, add);
|
||||
recordedSizeUpdater.addAndGet(this, size);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.SortedMap;
|
|||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
@ -105,9 +104,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
private final PageSubscriptionCounter counter;
|
||||
|
||||
private final AtomicLong deliveredCount = new AtomicLong(0);
|
||||
|
||||
private final AtomicLong deliveredSize = new AtomicLong(0);
|
||||
|
||||
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
|
||||
final PagingStore pageStore,
|
||||
|
@ -186,7 +182,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
if (empty) {
|
||||
return 0;
|
||||
} else {
|
||||
return counter.getValue() - deliveredCount.get();
|
||||
return counter.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,7 +198,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
} else {
|
||||
//A negative value could happen if an old journal was loaded that didn't have
|
||||
//size metrics for old records
|
||||
long messageSize = counter.getPersistentSize() - deliveredSize.get();
|
||||
long messageSize = counter.getPersistentSize();
|
||||
return messageSize > 0 ? messageSize : 0;
|
||||
}
|
||||
}
|
||||
|
@ -399,29 +395,21 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void confirmPosition(final Transaction tx, final PagePosition position) throws Exception {
|
||||
public void confirmPosition(final Transaction tx, final PagePosition position, boolean fromDelivery) throws Exception {
|
||||
// if the cursor is persistent
|
||||
if (persistent) {
|
||||
store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
|
||||
}
|
||||
installTXCallback(tx, position);
|
||||
installTXCallback(tx, position, fromDelivery);
|
||||
|
||||
}
|
||||
|
||||
private void confirmPosition(final Transaction tx, final PagePosition position, final long persistentSize) throws Exception {
|
||||
// if the cursor is persistent
|
||||
if (persistent) {
|
||||
store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
|
||||
}
|
||||
installTXCallback(tx, position, persistentSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ackTx(final Transaction tx, final PagedReference reference) throws Exception {
|
||||
public void ackTx(final Transaction tx, final PagedReference reference, boolean fromDelivery) throws Exception {
|
||||
//pre-calculate persistentSize
|
||||
final long persistentSize = getPersistentSize(reference);
|
||||
|
||||
confirmPosition(tx, reference.getPagedMessage().newPositionObject(), persistentSize);
|
||||
confirmPosition(tx, reference.getPagedMessage().newPositionObject(), true);
|
||||
|
||||
counter.increment(tx, -1, -persistentSize);
|
||||
|
||||
|
@ -584,8 +572,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
@Override
|
||||
public void reloadPreparedACK(final Transaction tx, final PagePosition position) {
|
||||
deliveredCount.incrementAndGet();
|
||||
installTXCallback(tx, position);
|
||||
installTXCallback(tx, position, true);
|
||||
|
||||
try {
|
||||
counter.increment(tx, -1, -position.getPersistentSize());
|
||||
|
@ -838,16 +825,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
return info;
|
||||
}
|
||||
|
||||
private void installTXCallback(final Transaction tx, final PagePosition position) {
|
||||
installTXCallback(tx, position, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tx
|
||||
* @param position
|
||||
* @param persistentSize if negative it needs to be calculated on the fly
|
||||
*/
|
||||
private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) {
|
||||
private void installTXCallback(final Transaction tx, final PagePosition position, final boolean fromDelivery) {
|
||||
if (position.getRecordID() >= 0) {
|
||||
// It needs to persist, otherwise the cursor will return to the fist page position
|
||||
tx.setContainsPersistent();
|
||||
|
@ -862,7 +840,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
PageCursorTX cursorTX = (PageCursorTX) tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
|
||||
|
||||
if (cursorTX == null) {
|
||||
cursorTX = new PageCursorTX();
|
||||
cursorTX = new PageCursorTX(fromDelivery);
|
||||
tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX);
|
||||
tx.addOperation(cursorTX);
|
||||
}
|
||||
|
@ -1118,6 +1096,12 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
private final class PageCursorTX extends TransactionOperationAbstract {
|
||||
|
||||
private boolean fromDelivery;
|
||||
|
||||
PageCursorTX(boolean fromDelivery) {
|
||||
this.fromDelivery = fromDelivery;
|
||||
}
|
||||
|
||||
private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<>();
|
||||
|
||||
private void addPositionConfirmation(final PageSubscriptionImpl cursor, final PagePosition position) {
|
||||
|
@ -1140,8 +1124,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
for (PagePosition confirmed : positions) {
|
||||
cursor.processACK(confirmed);
|
||||
cursor.deliveredCount.decrementAndGet();
|
||||
cursor.deliveredSize.addAndGet(-confirmed.getPersistentSize());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1435,7 +1417,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
|
||||
@Override
|
||||
public void remove() {
|
||||
deliveredCount.incrementAndGet();
|
||||
PagedReference delivery = currentDelivery;
|
||||
if (delivery != null) {
|
||||
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPagedMessage().getPageNumber());
|
||||
|
@ -1455,36 +1436,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the deliveredCount
|
||||
*/
|
||||
@Override
|
||||
public long getDeliveredCount() {
|
||||
return deliveredCount.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the deliveredSize
|
||||
*/
|
||||
@Override
|
||||
public long getDeliveredSize() {
|
||||
return deliveredSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementDeliveredSize(long size) {
|
||||
deliveredSize.addAndGet(size);
|
||||
}
|
||||
|
||||
private long getPersistentSize(PagedMessage msg) {
|
||||
try {
|
||||
return msg != null && msg.getPersistentSize() > 0 ? msg.getPersistentSize() : 0;
|
||||
} catch (ActiveMQException e) {
|
||||
logger.warn("Error computing persistent size of message: {}", msg, e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private long getPersistentSize(PagedReference ref) {
|
||||
try {
|
||||
return ref != null && ref.getPersistentSize() > 0 ? ref.getPersistentSize() : 0;
|
||||
|
|
|
@ -479,6 +479,14 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
return started;
|
||||
}
|
||||
|
||||
private volatile boolean rebuildingPageCounters;
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isRebuildingCounters() {
|
||||
return rebuildingPageCounters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
lock();
|
||||
|
@ -589,6 +597,9 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
|
||||
@Override
|
||||
public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
|
||||
if (rebuildingPageCounters) {
|
||||
logger.debug("Rebuild page counters is already underway, ignoring call");
|
||||
}
|
||||
Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap();
|
||||
// making a copy
|
||||
transactions.forEach((a, b) -> {
|
||||
|
@ -617,6 +628,8 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
FutureTask<Object> task = new FutureTask<>(() -> null);
|
||||
managerExecutor.execute(task);
|
||||
|
||||
managerExecutor.execute(() -> rebuildingPageCounters = false);
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,12 +28,20 @@ public interface ScheduledDeliveryHandler {
|
|||
|
||||
int getScheduledCount();
|
||||
|
||||
int getNonPagedScheduledCount();
|
||||
|
||||
long getScheduledSize();
|
||||
|
||||
long getNonPagedScheduledSize();
|
||||
|
||||
int getDurableScheduledCount();
|
||||
|
||||
int getNonPagedDurableScheduledCount();
|
||||
|
||||
long getDurableScheduledSize();
|
||||
|
||||
long getNonPagedDurableScheduledSize();
|
||||
|
||||
MessageReference peekFirstScheduledMessage();
|
||||
|
||||
List<MessageReference> getScheduledReferences();
|
||||
|
|
|
@ -1820,11 +1820,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
if (pageSubscription != null) {
|
||||
// messageReferences will have depaged messages which we need to discount from the counter as they are
|
||||
// counted on the pageSubscription as well
|
||||
long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Queue={}/{} returning getMessageCount \n\treturning {}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, \n\tpageSubscription.getMessageCount()={}, \n\tpageSubscription.getCounter().getValue()={}, \n\tpageSubscription.getDeliveredCount()={}",
|
||||
name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue(),
|
||||
pageSubscription.getDeliveredCount());
|
||||
long returnValue = (long) pendingMetrics.getNonPagedMessageCount() + scheduledDeliveryHandler.getNonPagedScheduledCount() + deliveringMetrics.getNonPagedMessageCount() + pageSubscription.getMessageCount();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Queue={}/{} returning getMessageCount \n\treturning {}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, \n\tpageSubscription.getMessageCount()={}, \n\tpageSubscription.getCounter().getValue()={}",
|
||||
name, id, returnValue, pendingMetrics.getMessageCount(), scheduledDeliveryHandler.getNonPagedScheduledCount(), pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue());
|
||||
}
|
||||
return returnValue;
|
||||
} else {
|
||||
|
@ -1837,7 +1836,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
if (pageSubscription != null) {
|
||||
// messageReferences will have depaged messages which we need to discount from the counter as they are
|
||||
// counted on the pageSubscription as well
|
||||
return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize() + pageSubscription.getPersistentSize();
|
||||
return pendingMetrics.getNonPagedPersistentSize() + scheduledDeliveryHandler.getNonPagedScheduledSize() + deliveringMetrics.getNonPagedPersistentSize() + pageSubscription.getPersistentSize();
|
||||
} else {
|
||||
return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize();
|
||||
}
|
||||
|
@ -1847,7 +1846,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
public long getDurableMessageCount() {
|
||||
if (isDurable()) {
|
||||
if (pageSubscription != null) {
|
||||
return (long) pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount() + pageSubscription.getMessageCount();
|
||||
return (long) pendingMetrics.getNonPagedDurableMessageCount() + scheduledDeliveryHandler.getNonPagedDurableScheduledCount() + deliveringMetrics.getNonPagedDurableMessageCount() + pageSubscription.getMessageCount();
|
||||
} else {
|
||||
return (long) pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount();
|
||||
}
|
||||
|
@ -1859,7 +1858,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
public long getDurablePersistentSize() {
|
||||
if (isDurable()) {
|
||||
if (pageSubscription != null) {
|
||||
return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize() + pageSubscription.getPersistentSize();
|
||||
return pendingMetrics.getDurablePersistentSize() + scheduledDeliveryHandler.getNonPagedDurableScheduledSize() + deliveringMetrics.getNonPagedDurablePersistentSize() + pageSubscription.getPersistentSize();
|
||||
} else {
|
||||
return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize();
|
||||
}
|
||||
|
@ -3496,9 +3495,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
addTail(reference, false);
|
||||
pageIterator.remove();
|
||||
|
||||
//We have to increment this here instead of in the iterator so we have access to the reference from next()
|
||||
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
@ -35,23 +35,43 @@ public class QueueMessageMetrics {
|
|||
private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> COUNT_UPDATER =
|
||||
AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "messageCount");
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> COUNT_UPDATER_PAGED =
|
||||
AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "messageCountPaged");
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> DURABLE_COUNT_UPDATER =
|
||||
AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "durableMessageCount");
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> DURABLE_COUNT_UPDATER_PAGED =
|
||||
AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "durableMessageCountPaged");
|
||||
|
||||
private static final AtomicLongFieldUpdater<QueueMessageMetrics> SIZE_UPDATER =
|
||||
AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "persistentSize");
|
||||
|
||||
private static final AtomicLongFieldUpdater<QueueMessageMetrics> SIZE_UPDATER_PAGED =
|
||||
AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "persistentSizePaged");
|
||||
|
||||
private static final AtomicLongFieldUpdater<QueueMessageMetrics> DURABLE_SIZE_UPDATER =
|
||||
AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "durablePersistentSize");
|
||||
|
||||
private static final AtomicLongFieldUpdater<QueueMessageMetrics> DURABLE_SIZE_UPDATER_PAGED =
|
||||
AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "durablePersistentSizePaged");
|
||||
|
||||
private volatile int messageCount;
|
||||
|
||||
private volatile int messageCountPaged;
|
||||
|
||||
private volatile long persistentSize;
|
||||
|
||||
private volatile long persistentSizePaged;
|
||||
|
||||
private volatile int durableMessageCount;
|
||||
|
||||
private volatile int durableMessageCountPaged;
|
||||
|
||||
private volatile long durablePersistentSize;
|
||||
|
||||
private volatile long durablePersistentSizePaged;
|
||||
|
||||
private final Queue queue;
|
||||
|
||||
private final String name;
|
||||
|
@ -64,89 +84,99 @@ public class QueueMessageMetrics {
|
|||
|
||||
public void incrementMetrics(final MessageReference reference) {
|
||||
long size = getPersistentSize(reference);
|
||||
COUNT_UPDATER.incrementAndGet(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} increment messageCount to {}: {}", this, messageCount, reference);
|
||||
}
|
||||
SIZE_UPDATER.addAndGet(this, size);
|
||||
if (queue.isDurable() && reference.isDurable()) {
|
||||
DURABLE_COUNT_UPDATER.incrementAndGet(this);
|
||||
DURABLE_SIZE_UPDATER.addAndGet(this, size);
|
||||
if (reference.isPaged()) {
|
||||
COUNT_UPDATER_PAGED.incrementAndGet(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} paged messageCountPaged to {}: {}", this, messageCountPaged, reference);
|
||||
}
|
||||
SIZE_UPDATER_PAGED.addAndGet(this, size);
|
||||
if (queue.isDurable() && reference.isDurable()) {
|
||||
DURABLE_COUNT_UPDATER_PAGED.incrementAndGet(this);
|
||||
DURABLE_SIZE_UPDATER_PAGED.addAndGet(this, size);
|
||||
}
|
||||
} else {
|
||||
COUNT_UPDATER.incrementAndGet(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} increment messageCount to {}: {}", this, messageCount, reference);
|
||||
}
|
||||
SIZE_UPDATER.addAndGet(this, size);
|
||||
if (queue.isDurable() && reference.isDurable()) {
|
||||
DURABLE_COUNT_UPDATER.incrementAndGet(this);
|
||||
DURABLE_SIZE_UPDATER.addAndGet(this, size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void decrementMetrics(final MessageReference reference) {
|
||||
long size = -getPersistentSize(reference);
|
||||
COUNT_UPDATER.decrementAndGet(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} decrement messageCount to {}: {}", this, messageCount, reference);
|
||||
}
|
||||
SIZE_UPDATER.addAndGet(this, size);
|
||||
if (queue.isDurable() && reference.isDurable()) {
|
||||
DURABLE_COUNT_UPDATER.decrementAndGet(this);
|
||||
DURABLE_SIZE_UPDATER.addAndGet(this, size);
|
||||
if (reference.isPaged()) {
|
||||
COUNT_UPDATER_PAGED.decrementAndGet(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} decrement messageCount to {}: {}", this, messageCountPaged, reference);
|
||||
}
|
||||
SIZE_UPDATER_PAGED.addAndGet(this, size);
|
||||
if (queue.isDurable() && reference.isDurable()) {
|
||||
DURABLE_COUNT_UPDATER_PAGED.decrementAndGet(this);
|
||||
DURABLE_SIZE_UPDATER_PAGED.addAndGet(this, size);
|
||||
}
|
||||
} else {
|
||||
COUNT_UPDATER.decrementAndGet(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} decrement messageCount to {}: {}", this, messageCount, reference);
|
||||
}
|
||||
SIZE_UPDATER.addAndGet(this, size);
|
||||
if (queue.isDurable() && reference.isDurable()) {
|
||||
DURABLE_COUNT_UPDATER.decrementAndGet(this);
|
||||
DURABLE_SIZE_UPDATER.addAndGet(this, size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public int getNonPagedMessageCount() {
|
||||
return messageCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the messageCount
|
||||
*/
|
||||
public int getMessageCount() {
|
||||
return messageCount;
|
||||
return messageCount + messageCountPaged;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param messageCount the messageCount to set
|
||||
*/
|
||||
public void setMessageCount(int messageCount) {
|
||||
this.messageCount = messageCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the persistentSize
|
||||
*/
|
||||
public long getPersistentSize() {
|
||||
return persistentSize + persistentSizePaged;
|
||||
}
|
||||
|
||||
public long getNonPagedPersistentSize() {
|
||||
return persistentSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param persistentSize the persistentSize to set
|
||||
*/
|
||||
public void setPersistentSize(long persistentSize) {
|
||||
this.persistentSize = persistentSize;
|
||||
public long getNonPagedDurablePersistentSize() {
|
||||
return durablePersistentSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the durableMessageCount
|
||||
*/
|
||||
public int getDurableMessageCount() {
|
||||
return durableMessageCount;
|
||||
return durableMessageCount + durableMessageCountPaged;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param durableMessageCount the durableMessageCount to set
|
||||
*/
|
||||
public void setDurableMessageCount(int durableMessageCount) {
|
||||
this.durableMessageCount = durableMessageCount;
|
||||
public int getNonPagedDurableMessageCount() {
|
||||
return durableMessageCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the durablePersistentSize
|
||||
*/
|
||||
public long getDurablePersistentSize() {
|
||||
return durablePersistentSize;
|
||||
return durablePersistentSize + durablePersistentSizePaged;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param durablePersistentSize the durablePersistentSize to set
|
||||
*/
|
||||
public void setDurablePersistentSize(long durablePersistentSize) {
|
||||
this.durablePersistentSize = durablePersistentSize;
|
||||
}
|
||||
|
||||
private long getPersistentSize(final MessageReference reference) {
|
||||
private static long getPersistentSize(final MessageReference reference) {
|
||||
long size = 0;
|
||||
|
||||
try {
|
||||
|
@ -158,9 +188,4 @@ public class QueueMessageMetrics {
|
|||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "QueuePendingMessageMetrics[queue=" + queue.getName() + ", name=" + name + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -96,21 +96,41 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
|
|||
return metrics.getMessageCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNonPagedScheduledCount() {
|
||||
return metrics.getNonPagedMessageCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDurableScheduledCount() {
|
||||
return metrics.getDurableMessageCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNonPagedDurableScheduledCount() {
|
||||
return metrics.getNonPagedDurableMessageCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getScheduledSize() {
|
||||
return metrics.getPersistentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNonPagedScheduledSize() {
|
||||
return metrics.getNonPagedPersistentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDurableScheduledSize() {
|
||||
return metrics.getDurablePersistentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNonPagedDurableScheduledSize() {
|
||||
return metrics.getNonPagedPersistentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MessageReference> getScheduledReferences() {
|
||||
List<MessageReference> refs = new LinkedList<>();
|
||||
|
|
|
@ -76,7 +76,7 @@ public class AckManagerTest extends ActiveMQTestBase {
|
|||
super.setUp();
|
||||
|
||||
server1 = createServer(true, createDefaultConfig(0, true), 100024, -1, -1, -1);
|
||||
server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1));
|
||||
server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20));
|
||||
server1.getConfiguration().getAcceptorConfigurations().clear();
|
||||
server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616");
|
||||
server1.start();
|
||||
|
@ -289,6 +289,110 @@ public class AckManagerTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testRetryFromPaging() throws Throwable {
|
||||
|
||||
String protocol = "AMQP";
|
||||
|
||||
SimpleString TOPIC_NAME = SimpleString.toSimpleString("tp" + RandomUtil.randomString());
|
||||
|
||||
server1.addAddressInfo(new AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
|
||||
|
||||
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
|
||||
|
||||
// creating 5 subscriptions
|
||||
for (int i = 0; i < 2; i++) {
|
||||
try (Connection connection = connectionFactory.createConnection()) {
|
||||
connection.setClientID("c" + i);
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic(TOPIC_NAME.toString());
|
||||
session.createDurableSubscriber(topic, "s" + i);
|
||||
}
|
||||
}
|
||||
|
||||
int numberOfMessages = 15000;
|
||||
int numberOfAcksC0 = 100;
|
||||
int numberOfAcksC1 = 14999;
|
||||
|
||||
String c0s0Name = "c0.s0";
|
||||
String c1s1Name = "c1.s1";
|
||||
|
||||
final Queue c0s0 = server1.locateQueue(c0s0Name);
|
||||
Assert.assertNotNull(c0s0);
|
||||
final Queue c1s1 = server1.locateQueue(c1s1Name);
|
||||
Assert.assertNotNull(c1s1);
|
||||
|
||||
PagingStore store = server1.getPagingManager().getPageStore(TOPIC_NAME);
|
||||
store.startPaging();
|
||||
|
||||
try (Connection connection = connectionFactory.createConnection()) {
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Topic topic = session.createTopic(TOPIC_NAME.toString());
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
Message m = session.createTextMessage("hello " + i);
|
||||
m.setIntProperty("i", i);
|
||||
producer.send(m);
|
||||
if ((i + 1) % 100 == 0) {
|
||||
c1s1.pause();
|
||||
c0s0.pause();
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
}
|
||||
|
||||
ReferenceIDSupplier referenceIDSupplier = new ReferenceIDSupplier(server1);
|
||||
|
||||
{
|
||||
AckManager ackManager = AckManagerProvider.getManager(server1);
|
||||
ackManager.stop();
|
||||
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
for (long pageID = store.getFirstPage(); pageID <= store.getCurrentWritingPage(); pageID++) {
|
||||
Page page = store.usePage(pageID);
|
||||
try {
|
||||
page.getMessages().forEach(pagedMessage -> {
|
||||
int increment = counter.incrementAndGet();
|
||||
if (increment <= numberOfAcksC0) {
|
||||
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c0s0, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
|
||||
}
|
||||
if (increment <= numberOfAcksC1) {
|
||||
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c1s1, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
page.usageDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
server1.stop();
|
||||
|
||||
server1.start();
|
||||
|
||||
|
||||
Queue c0s0AfterRestart = server1.locateQueue(c0s0Name);
|
||||
Assert.assertNotNull(c0s0AfterRestart);
|
||||
Queue c1s1AfterRestart = server1.locateQueue(c1s1Name);
|
||||
Assert.assertNotNull(c1s1AfterRestart);
|
||||
|
||||
Wait.assertEquals(numberOfMessages - numberOfAcksC1, c1s1AfterRestart::getMessageCount, 10_000);
|
||||
Wait.assertEquals(numberOfAcksC1, c1s1AfterRestart::getMessagesAcknowledged, 10_000);
|
||||
Wait.assertEquals(numberOfMessages - numberOfAcksC0, c0s0AfterRestart::getMessageCount, 10_000);
|
||||
Wait.assertEquals(numberOfAcksC0, c0s0AfterRestart::getMessagesAcknowledged, 10_000);
|
||||
|
||||
server1.stop();
|
||||
|
||||
Assert.assertEquals(0, AckManagerProvider.getSize());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private int getCounter(byte typeRecord, HashMap<Integer, AtomicInteger> values) {
|
||||
AtomicInteger value = values.get((int) typeRecord);
|
||||
if (value == null) {
|
||||
|
|
|
@ -185,8 +185,8 @@ public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport
|
|||
|
||||
AtomicLong publishedMessageSize = new AtomicLong();
|
||||
|
||||
publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
|
||||
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
|
||||
publishTestQueueMessages(10, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
|
||||
verifyPendingStats(defaultQueueName, 10, publishedMessageSize.get());
|
||||
verifyPendingDurableStats(defaultQueueName, 0, 0);
|
||||
}
|
||||
|
||||
|
@ -196,10 +196,10 @@ public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport
|
|||
AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
|
||||
AtomicLong publishedMessageSize = new AtomicLong();
|
||||
|
||||
publishTestQueueMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
|
||||
publishTestQueueMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
|
||||
verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
|
||||
verifyPendingDurableStats(defaultQueueName, 100, publishedMessageSize.get());
|
||||
publishTestQueueMessages(5, DeliveryMode.PERSISTENT, publishedMessageSize);
|
||||
publishTestQueueMessages(10, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
|
||||
verifyPendingStats(defaultQueueName, 15, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
|
||||
verifyPendingDurableStats(defaultQueueName, 5, publishedMessageSize.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.io.StringWriter;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -61,11 +62,16 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
|
||||
// Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's
|
||||
private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false"));
|
||||
private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_500);
|
||||
private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_000);
|
||||
|
||||
// By default consuming 90% of the messages
|
||||
private static final int NUMBER_MESSAGES_RECEIVE = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES_RECEIVE", 1_800);
|
||||
private static final int RECEIVE_COMMIT = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 100);
|
||||
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
|
||||
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
|
||||
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
|
||||
|
||||
// If -1 means to never kill the target broker
|
||||
private static final int KILL_INTERVAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 1_000);
|
||||
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 300_000);
|
||||
private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
|
||||
|
||||
/*
|
||||
|
@ -134,14 +140,14 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
|
||||
brokerProperties.put("largeMessageSync", "false");
|
||||
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
|
||||
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
|
||||
//brokerProperties.put("mirrorAckManagerPageAttempts", "20");
|
||||
//brokerProperties.put("mirrorAckManagerRetryDelay", "100");
|
||||
|
||||
if (paging) {
|
||||
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
|
||||
brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
|
||||
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
|
||||
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
|
||||
brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
|
||||
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
|
||||
// un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl
|
||||
// brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
|
||||
}
|
||||
|
@ -162,6 +168,12 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
+ "logger.ack.level=TRACE\n"
|
||||
+ "logger.config.name=org.apache.activemq.artemis.core.config.impl.ConfigurationImpl\n"
|
||||
+ "logger.config.level=TRACE\n"
|
||||
+ "logger.counter.name=org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl\n"
|
||||
+ "logger.counter.level=DEBUG\n"
|
||||
+ "logger.queue.name=org.apache.activemq.artemis.core.server.impl.QueueImpl\n"
|
||||
+ "logger.queue.level=DEBUG\n"
|
||||
+ "logger.rebuild.name=org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager\n"
|
||||
+ "logger.rebuild.level=DEBUG\n"
|
||||
+ "appender.console.filter.threshold.type = ThresholdFilter\n"
|
||||
+ "appender.console.filter.threshold.level = info"));
|
||||
}
|
||||
|
@ -187,7 +199,7 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
startServers();
|
||||
|
||||
|
||||
Assert.assertTrue(KILL_INTERNAL > SEND_COMMIT);
|
||||
Assert.assertTrue(KILL_INTERVAL > SEND_COMMIT || KILL_INTERVAL < 0);
|
||||
|
||||
String clientIDA = "nodeA";
|
||||
String clientIDB = "nodeB";
|
||||
|
@ -212,18 +224,23 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(3);
|
||||
runAfter(executorService::shutdownNow);
|
||||
CountDownLatch consumerDone = new CountDownLatch(2);
|
||||
executorService.execute(() -> {
|
||||
try {
|
||||
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
|
||||
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
} finally {
|
||||
consumerDone.countDown();
|
||||
}
|
||||
});
|
||||
executorService.execute(() -> {
|
||||
try {
|
||||
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
|
||||
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
} finally {
|
||||
consumerDone.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -243,7 +260,7 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
logger.info("Sent {} messages", i);
|
||||
session.commit();
|
||||
}
|
||||
if (i > 0 && i % KILL_INTERNAL == 0) {
|
||||
if (KILL_INTERVAL > 0 && i > 0 && i % KILL_INTERVAL == 0) {
|
||||
restartExeuctor.execute(() -> {
|
||||
if (running.get()) {
|
||||
try {
|
||||
|
@ -265,12 +282,14 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
running.set(false);
|
||||
}
|
||||
|
||||
consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
|
||||
destroyServers();
|
||||
|
||||
|
@ -337,9 +356,10 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
|
||||
public long getCount(SimpleManagement simpleManagement, String queue) {
|
||||
try {
|
||||
long value = simpleManagement.getMessageCountOnQueue(queue);
|
||||
logger.info("Queue {} count = {}", queue, value);
|
||||
return value;
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
#!/bin/sh
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# use this script for larger parameters, possibly when using larger machines
|
||||
|
||||
|
||||
## Generic variable:
|
||||
# Some tests will support saving the producer's state before consumption. If you set this variable these tests will hold a zip file and recover it approprieatedly.
|
||||
#export TEST_ZIP_LOCATION=~/zipTest/
|
||||
|
||||
#HorizontalPagingTest
|
||||
|
||||
export TEST_HORIZONTAL_TEST_ENABLED=true
|
||||
export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000
|
||||
export TEST_HORIZONTAL_TIMEOUT_MINUTES=120
|
||||
export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP
|
||||
|
||||
export TEST_HORIZONTAL_CORE_DESTINATIONS=20
|
||||
export TEST_HORIZONTAL_CORE_MESSAGES=5000
|
||||
export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=1000
|
||||
export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0
|
||||
export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000
|
||||
export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=20
|
||||
|
||||
export TEST_HORIZONTAL_AMQP_DESTINATIONS=20
|
||||
export TEST_HORIZONTAL_AMQP_MESSAGES=1000
|
||||
export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100
|
||||
export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0
|
||||
export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000
|
||||
export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10
|
||||
|
||||
export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=20
|
||||
export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000
|
||||
export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100
|
||||
export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0
|
||||
export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000
|
||||
export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10
|
||||
|
||||
export TEST_FLOW_SERVER_START_TIMEOUT=300000
|
||||
export TEST_FLOW_TIMEOUT_MINUTES=120
|
||||
|
||||
|
||||
# FlowControlPagingTest
|
||||
export TEST_FLOW_PROTOCOL_LIST=CORE,AMQP,OPENWIRE
|
||||
export TEST_FLOW_PRINT_INTERVAL=100
|
||||
|
||||
export TEST_FLOW_OPENWIRE_MESSAGES=10000
|
||||
export TEST_FLOW_OPENWIRE_COMMIT_INTERVAL=1000
|
||||
export TEST_FLOW_OPENWIRE_RECEIVE_COMMIT_INTERVAL=10
|
||||
export TEST_FLOW_OPENWIRE_MESSAGE_SIZE=60000
|
||||
|
||||
export TEST_FLOW_CORE_MESSAGES=10000
|
||||
export TEST_FLOW_CORE_COMMIT_INTERVAL=1000
|
||||
export TEST_FLOW_CORE_RECEIVE_COMMIT_INTERVAL=10
|
||||
export TEST_FLOW_CORE_MESSAGE_SIZE=30000
|
||||
|
||||
export TEST_FLOW_AMQP_MESSAGES=10000
|
||||
export TEST_FLOW_AMQP_COMMIT_INTERVAL=1000
|
||||
export TEST_FLOW_AMQP_RECEIVE_COMMIT_INTERVAL=10
|
||||
export TEST_FLOW_AMQP_MESSAGE_SIZE=30000
|
||||
|
||||
|
||||
# SubscriptionPagingTest
|
||||
export TEST_SUBSCRIPTION_PROTOCOL_LIST=CORE
|
||||
|
||||
export TEST_SUBSCRIPTION_SERVER_START_TIMEOUT=300000
|
||||
export TEST_SUBSCRIPTION_TIMEOUT_MINUTES=120
|
||||
export TEST_SUBSCRIPTION_PRINT_INTERVAL=100
|
||||
export TEST_SUBSCRIPTION_SLOW_SUBSCRIPTIONS=5
|
||||
|
||||
|
||||
export TEST_SUBSCRIPTION_CORE_MESSAGES=10000
|
||||
export TEST_SUBSCRIPTION_CORE_COMMIT_INTERVAL=1000
|
||||
export TEST_SUBSCRIPTION_CORE_RECEIVE_COMMIT_INTERVAL=0
|
||||
export TEST_SUBSCRIPTION_CORE_MESSAGE_SIZE=30000
|
||||
export TEST_SUBSCRIPTION_SLEEP_SLOW=1000
|
||||
|
||||
|
||||
#OWLeakTest
|
||||
export TEST_OW_LEAK_TEST_ENABLED=true
|
||||
export TEST_OW_LEAK_PROTOCOL_LIST=OPENWIRE
|
||||
export TEST_OW_LEAK_OPENWIRE_NUMBER_OF_MESSAGES=15
|
||||
export TEST_OW_LEAK_OPENWIRE_PRODUCERS=1
|
||||
export TEST_OW_LEAK_OPENWIRE_MESSAGE_SIZE=2000000
|
||||
export TEST_OW_LEAK_PRINT_INTERVAL=1
|
||||
|
||||
#DatabasePagingTest
|
||||
export TEST_PGDB_DB_LIST=derby
|
||||
# use this to allow all the databases
|
||||
#export TEST_PGDB_DB_LIST=derby,postgres,mysql
|
||||
export TEST_PGDB_MAX_MESSAGES=500
|
||||
export TEST_PGDB_MESSAGE_SIZE=100
|
||||
export TEST_PGDB_COMMIT_INTERVAL=50
|
||||
|
||||
#ClientFailureSoakTest
|
||||
export TEST_CLIENT_FAILURE_TEST_ENABLED=true
|
||||
export TEST_CLIENT_FAILURE_PROTOCOL_LIST=AMQP,CORE,OPENWIRE
|
||||
|
||||
export TEST_CLIENT_FAILURE_AMQP_USE_LARGE_MESSAGE=TRUE
|
||||
export TEST_CLIENT_FAILURE_AMQP_THREADS_PER_VM=20
|
||||
export TEST_CLIENT_FAILURE_AMQP_CLIENT_CONSUMERS_PER_THREAD=20
|
||||
export TEST_CLIENT_FAILURE_AMQP_TEST_REPEATS=1
|
||||
export TEST_CLIENT_FAILURE_AMQP_TOTAL_ITERATION=2
|
||||
export TEST_CLIENT_FAILURE_AMQP_NUMBER_OF_VMS=5
|
||||
export TEST_CLIENT_FAILURE_AMQP_NUMBER_OF_MESSAGES=20000
|
||||
export TEST_CLIENT_FAILURE_AMQP_MEMORY_CLIENT=-Xmx256m
|
||||
|
||||
export TEST_CLIENT_FAILURE_CORE_USE_LARGE_MESSAGE=TRUE
|
||||
export TEST_CLIENT_FAILURE_CORE_THREADS_PER_VM=20
|
||||
export TEST_CLIENT_FAILURE_CORE_CLIENT_CONSUMERS_PER_THREAD=20
|
||||
export TEST_CLIENT_FAILURE_CORE_TEST_REPEATS=1
|
||||
export TEST_CLIENT_FAILURE_CORE_TOTAL_ITERATION=2
|
||||
export TEST_CLIENT_FAILURE_CORE_NUMBER_OF_VMS=5
|
||||
export TEST_CLIENT_FAILURE_CORE_NUMBER_OF_MESSAGES=20000
|
||||
export TEST_CLIENT_FAILURE_CORE_MEMORY_CLIENT=-Xmx256m
|
||||
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_USE_LARGE_MESSAGE=TRUE
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_THREADS_PER_VM=20
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_CLIENT_CONSUMERS_PER_THREAD=20
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_TEST_REPEATS=1
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_TOTAL_ITERATION=2
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_VMS=5
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_MESSAGES=20000
|
||||
export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m
|
||||
|
||||
#clusterNotificationsContinuityTest
|
||||
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true
|
||||
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3
|
||||
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
|
||||
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
|
||||
|
||||
export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false
|
||||
export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=500000
|
||||
export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES_RECEIVE=400000
|
||||
export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=500
|
||||
export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=1000
|
||||
export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=50000
|
||||
export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000000
|
||||
export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=100000
|
||||
export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10
|
|
@ -144,8 +144,11 @@ export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
|
|||
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
|
||||
|
||||
export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false
|
||||
export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500
|
||||
export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100
|
||||
export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100
|
||||
export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500
|
||||
export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000
|
||||
export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=50000
|
||||
export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES_RECEIVE=40000
|
||||
export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=500
|
||||
export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=1000
|
||||
export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=10000
|
||||
export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000
|
||||
export TEST_SINGLE_MIRROR_SOAK_GENERAL_TIMEOUT=10000
|
||||
export TEST_SINGLE_MIRROR_SOAK_CONSUMER_PROCESSING_TIME=10
|
||||
|
|
Loading…
Reference in New Issue