From a5a993ed9d67e47e97a0a8198d75600188df2eae Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 6 Jan 2016 15:45:19 -0500 Subject: [PATCH] ARTEMIS-332 - test added / better dealing with critical errors on paging --- .../artemis/core/paging/PagingStore.java | 2 - .../paging/cursor/PageCursorProvider.java | 5 +- .../core/paging/cursor/PageSubscription.java | 5 +- .../core/paging/cursor/PagedReference.java | 3 +- .../paging/cursor/PagedReferenceImpl.java | 17 +- .../cursor/impl/PageCursorProviderImpl.java | 10 +- .../cursor/impl/PageSubscriptionImpl.java | 102 +++------ .../core/paging/impl/PagingStoreImpl.java | 6 - .../artemis/core/server/MessageReference.java | 4 +- .../activemq/artemis/core/server/Queue.java | 2 +- .../core/server/impl/LastValueQueue.java | 79 ++----- .../artemis/core/server/impl/QueueImpl.java | 65 +++--- .../core/server/impl/RefsOperation.java | 37 ++-- .../tests/extras/byteman/PagingOMETest.java | 206 ++++++++++++++++++ .../storage/PersistMultiThreadTest.java | 4 - 15 files changed, 321 insertions(+), 226 deletions(-) create mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index c8808b3f1d..e831966d02 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -47,8 +47,6 @@ public interface PagingStore extends ActiveMQComponent { int getNumberOfPages(); - void criticalException(Throwable e); - /** * Returns the page id of the current page in which the system is writing files. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index c8404ba48e..951b83c46c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.paging.cursor; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -25,7 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; */ public interface PageCursorProvider { - PageCache getPageCache(long pageNr) throws ActiveMQException; + PageCache getPageCache(long pageNr); PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub); @@ -39,7 +38,7 @@ public interface PageCursorProvider { PageSubscription createSubscription(long queueId, Filter filter, boolean durable); - PagedMessage getMessage(PagePosition pos) throws ActiveMQException; + PagedMessage getMessage(PagePosition pos); void processReload() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index 386f21ffc6..df2ccc3627 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.paging.cursor; import java.util.concurrent.Executor; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -96,7 +95,7 @@ public interface PageSubscription { void reloadPageCompletion(PagePosition position); - void reloadPageInfo(long pageNr) throws ActiveMQException; + void reloadPageInfo(long pageNr); /** * To be called when the cursor decided to ignore a position. @@ -148,7 +147,7 @@ public interface PageSubscription { * @param pos * @return */ - PagedMessage queryMessage(PagePosition pos) throws ActiveMQException; + PagedMessage queryMessage(PagePosition pos); /** * @return executor used by the PageSubscription diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java index 46041c5841..c1ff089681 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.paging.cursor; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -24,5 +23,5 @@ public interface PagedReference extends MessageReference { PagePosition getPosition(); - PagedMessage getPagedMessage() throws ActiveMQException; + PagedMessage getPagedMessage(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 964737f18c..82b0e9283b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.paging.cursor; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -50,12 +49,12 @@ public class PagedReferenceImpl implements PagedReference { private boolean alreadyAcked; @Override - public ServerMessage getMessage() throws ActiveMQException { + public ServerMessage getMessage() { return getPagedMessage().getMessage(); } @Override - public synchronized PagedMessage getPagedMessage() throws ActiveMQException { + public synchronized PagedMessage getPagedMessage() { PagedMessage returnMessage = message != null ? message.get() : null; // We only keep a few references on the Queue from paging... @@ -111,7 +110,7 @@ public class PagedReferenceImpl implements PagedReference { try { messageEstimate = getMessage().getMemoryEstimate(); } - catch (ActiveMQException e) { + catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } } @@ -120,13 +119,7 @@ public class PagedReferenceImpl implements PagedReference { @Override public MessageReference copy(final Queue queue) { - try { - return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription); - } - catch (ActiveMQException e) { - ActiveMQServerLogger.LOGGER.warn(e); - return this; - } + return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription); } @Override @@ -141,7 +134,7 @@ public class PagedReferenceImpl implements PagedReference { deliveryTime = 0L; } } - catch (ActiveMQException e) { + catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); return 0L; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index ef57e1c769..5f5e1b32e7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -24,8 +24,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -111,7 +109,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } @Override - public PagedMessage getMessage(final PagePosition pos) throws ActiveMQException { + public PagedMessage getMessage(final PagePosition pos) { PageCache cache = getPageCache(pos.getPageNr()); if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) { @@ -130,7 +128,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } @Override - public PageCache getPageCache(final long pageId) throws ActiveMQException { + public PageCache getPageCache(final long pageId) { try { PageCache cache; synchronized (softCache) { @@ -157,8 +155,8 @@ public class PageCursorProviderImpl implements PageCursorProvider { return cache; } - catch (Throwable e) { - throw new ActiveMQIOErrorException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e); + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index d7a6ded065..9c1702e208 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -333,7 +333,7 @@ final class PageSubscriptionImpl implements PageSubscription { return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]"; } - private PagedReference getReference(PagePosition pos) throws ActiveMQException { + private PagedReference getReference(PagePosition pos) { return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this); } @@ -342,7 +342,7 @@ final class PageSubscriptionImpl implements PageSubscription { return new CursorIterator(); } - private PagedReference internalGetNext(final PagePosition pos) throws ActiveMQException { + private PagedReference internalGetNext(final PagePosition pos) { PagePosition retPos = pos.nextMessage(); PageCache cache = cursorProvider.getPageCache(pos.getPageNr()); @@ -471,17 +471,11 @@ final class PageSubscriptionImpl implements PageSubscription { public void onError(final int errorCode, final String errorMessage) { error = " errorCode=" + errorCode + ", msg=" + errorMessage; ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error); - getPagingStore().criticalException(new ActiveMQException(errorMessage)); } @Override public void done() { - try { - processACK(position); - } - catch (ActiveMQException e) { - getPagingStore().criticalException(e); - } + processACK(position); } @Override @@ -511,11 +505,10 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void addPendingDelivery(final PagePosition position) { - try { - getPageInfo(position).incrementPendingTX(); - } - catch (Exception e) { - getPagingStore().criticalException(e); + PageCursorInfo info = getPageInfo(position); + + if (info != null) { + info.incrementPendingTX(); } } @@ -535,7 +528,7 @@ final class PageSubscriptionImpl implements PageSubscription { } @Override - public PagedMessage queryMessage(PagePosition pos) throws ActiveMQException { + public PagedMessage queryMessage(PagePosition pos) { return cursorProvider.getMessage(pos); } @@ -554,32 +547,17 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void reloadPreparedACK(final Transaction tx, final PagePosition position) { deliveredCount.incrementAndGet(); - try { - installTXCallback(tx, position); - } - catch (Exception e) { - getPagingStore().criticalException(e); - } + installTXCallback(tx, position); } @Override public void positionIgnored(final PagePosition position) { - try { - processACK(position); - } - catch (Exception e) { - getPagingStore().criticalException(e); - } + processACK(position); } public void lateDeliveryRollback(PagePosition position) { - try { - PageCursorInfo cursorInfo = processACK(position); - cursorInfo.decrementPendingTX(); - } - catch (ActiveMQException e) { - getPagingStore().criticalException(e); - } + PageCursorInfo cursorInfo = processACK(position); + cursorInfo.decrementPendingTX(); } @Override @@ -750,15 +728,15 @@ final class PageSubscriptionImpl implements PageSubscription { } @Override - public void reloadPageInfo(long pageNr) throws ActiveMQException { + public void reloadPageInfo(long pageNr) { getPageInfo(pageNr, true); } - private PageCursorInfo getPageInfo(final PagePosition pos) throws ActiveMQException { + private PageCursorInfo getPageInfo(final PagePosition pos) { return getPageInfo(pos.getPageNr(), true); } - private PageCursorInfo getPageInfo(final long pageNr, boolean create) throws ActiveMQException { + private PageCursorInfo getPageInfo(final long pageNr, boolean create) { synchronized (consumedPages) { PageCursorInfo pageInfo = consumedPages.get(pageNr); @@ -792,7 +770,7 @@ final class PageSubscriptionImpl implements PageSubscription { // To be called only after the ACK has been processed and guaranteed to be on storage // The only exception is on non storage events such as not matching messages - private PageCursorInfo processACK(final PagePosition pos) throws ActiveMQException { + private PageCursorInfo processACK(final PagePosition pos) { if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK"); @@ -828,7 +806,7 @@ final class PageSubscriptionImpl implements PageSubscription { * @param tx * @param position */ - private void installTXCallback(final Transaction tx, final PagePosition position) throws ActiveMQException { + private void installTXCallback(final Transaction tx, final PagePosition position) { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); @@ -960,13 +938,7 @@ final class PageSubscriptionImpl implements PageSubscription { } public boolean isDone() { - try { - return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0); - } - catch (ActiveMQException e) { - getPagingStore().criticalException(e); - throw new RuntimeException(e.getMessage(), e); - } + return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0); } public boolean isPendingDelete() { @@ -1047,7 +1019,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } - private int getNumberOfMessages() throws ActiveMQException { + private int getNumberOfMessages() { if (wasLive) { // if the page was live at any point, we need to // get the number of messages from the page-cache @@ -1089,12 +1061,7 @@ final class PageSubscriptionImpl implements PageSubscription { List positions = entry.getValue(); for (PagePosition confirmed : positions) { - try { - cursor.processACK(confirmed); - } - catch (ActiveMQException e) { - getPagingStore().criticalException(e); - } + cursor.processACK(confirmed); cursor.deliveredCount.decrementAndGet(); } @@ -1165,21 +1132,15 @@ final class PageSubscriptionImpl implements PageSubscription { return currentDelivery; } - try { - if (position == null) { - position = getStartPosition(); - } + if (position == null) { + position = getStartPosition(); + } - currentDelivery = moveNext(); - return currentDelivery; - } - catch (ActiveMQException e) { - getPagingStore().criticalException(e); - throw new IllegalStateException(e.getMessage(), e); - } + currentDelivery = moveNext(); + return currentDelivery; } - private PagedReference moveNext() throws ActiveMQException { + private PagedReference moveNext() { synchronized (PageSubscriptionImpl.this) { boolean match = false; @@ -1309,14 +1270,9 @@ final class PageSubscriptionImpl implements PageSubscription { deliveredCount.incrementAndGet(); PagedReference delivery = currentDelivery; if (delivery != null) { - try { - PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition()); - if (info != null) { - info.remove(currentDelivery.getPosition()); - } - } - catch (ActiveMQException e) { - getPagingStore().criticalException(e); + PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition()); + if (info != null) { + info.remove(currentDelivery.getPosition()); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 9136c179ed..1463b3cdba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -176,12 +176,6 @@ public class PagingStoreImpl implements PagingStore { } - @Override - public void criticalException(Throwable e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - storeFactory.criticalException(e); - } - /** * @param addressSettings */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 95b30b27c1..0ff55ac1b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.core.server; -import org.apache.activemq.artemis.api.core.ActiveMQException; - /** * A reference to a message. * @@ -27,7 +25,7 @@ public interface MessageReference { boolean isPaged(); - ServerMessage getMessage() throws ActiveMQException; + ServerMessage getMessage(); /** * We define this method aggregation here because on paging we need to hold the original estimate, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 9ea60cd5e3..81bd5653dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -239,7 +239,7 @@ public interface Queue extends Bindable { */ void deliverScheduledMessages() throws ActiveMQException; - void postAcknowledge(MessageReference ref) throws ActiveMQException; + void postAcknowledge(MessageReference ref); float getRate(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index c6d5aeed1d..5420688e18 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; @@ -67,15 +66,7 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addTail(final MessageReference ref, final boolean direct) { - SimpleString prop; - - try { - prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); - } - catch (ActiveMQException e) { - criticalError(e); - throw new IllegalStateException(e); - } + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); if (prop != null) { HolderReference hr = map.get(prop); @@ -112,59 +103,45 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addHead(final MessageReference ref) { - try { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); - if (prop != null) { - HolderReference hr = map.get(prop); + if (prop != null) { + HolderReference hr = map.get(prop); - if (hr != null) { - // We keep the current ref and ack the one we are returning + if (hr != null) { + // We keep the current ref and ack the one we are returning - super.referenceHandled(); + super.referenceHandled(); - try { - super.acknowledge(ref); - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); - } + try { + super.acknowledge(ref); } - else { - map.put(prop, (HolderReference) ref); - - super.addHead(ref); + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); } } else { + map.put(prop, (HolderReference) ref); + super.addHead(ref); } } - catch (ActiveMQException e) { - criticalError(e); - throw new IllegalStateException(e); + else { + super.addHead(ref); } } @Override protected void refRemoved(MessageReference ref) { - try { + synchronized (this) { + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); - synchronized (this) { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); - - if (prop != null) { - map.remove(prop); - } + if (prop != null) { + map.remove(prop); } - - super.refRemoved(ref); - } - catch (ActiveMQException e) { - criticalError(e); - throw new IllegalStateException(e); } + super.refRemoved(ref); } private class HolderReference implements MessageReference { @@ -223,13 +200,7 @@ public class LastValueQueue extends QueueImpl { @Override public ServerMessage getMessage() { - try { - return ref.getMessage(); - } - catch (ActiveMQException e) { - criticalError(e); - throw new IllegalStateException(e); - } + return ref.getMessage(); } @Override @@ -285,13 +256,7 @@ public class LastValueQueue extends QueueImpl { */ @Override public int getMessageMemoryEstimate() { - try { - return ref.getMessage().getMemoryEstimate(); - } - catch (ActiveMQException e) { - criticalError(e); - throw new IllegalStateException(e); - } + return ref.getMessage().getMemoryEstimate(); } /* (non-Javadoc) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index c963e4d9e5..12b5231459 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1054,13 +1054,7 @@ public class QueueImpl implements Queue { @Override public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) { - try { - getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference); - } - catch (ActiveMQException e) { - criticalError(e); - getPageSubscription().getPagingStore().criticalException(e); - } + getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference); } @Override @@ -1777,12 +1771,7 @@ public class QueueImpl implements Queue { private synchronized void internalAddTail(final MessageReference ref) { refAdded(ref); - try { - messageReferences.addTail(ref, ref.getMessage().getPriority()); - } - catch (ActiveMQException e) { - criticalError(e); - } + messageReferences.addTail(ref, getPriority(ref)); } /** @@ -1793,18 +1782,22 @@ public class QueueImpl implements Queue { * @param ref */ private void internalAddHead(final MessageReference ref) { - try { - queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); - refAdded(ref); - messageReferences.addHead(ref, ref.getMessage().getPriority()); - } - catch (ActiveMQException e) { - criticalError(e); - } + queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); + refAdded(ref); + + int priority = getPriority(ref); + + messageReferences.addHead(ref, priority); } - void criticalError(ActiveMQException e) { - storageManager.criticalError(e); + private int getPriority(MessageReference ref) { + try { + return ref.getMessage().getPriority(); + } + catch (Throwable e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return 4; // the default one in case of failure + } } private synchronized void doInternalPoll() { @@ -2036,9 +2029,9 @@ public class QueueImpl implements Queue { // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID); } - catch (ActiveMQException e) { - criticalError(e); - throw new IllegalStateException(e); + catch (Throwable e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return null; } } } @@ -2543,9 +2536,9 @@ public class QueueImpl implements Queue { return false; } } - catch (ActiveMQException e) { - criticalError(e); - throw new IllegalStateException(e); + catch (Throwable e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return false; } } @@ -2584,7 +2577,7 @@ public class QueueImpl implements Queue { } @Override - public void postAcknowledge(final MessageReference ref) throws ActiveMQException { + public void postAcknowledge(final MessageReference ref) { QueueImpl queue = (QueueImpl) ref.getQueue(); queue.decDelivering(); @@ -2594,9 +2587,17 @@ public class QueueImpl implements Queue { return; } - final ServerMessage message = ref.getMessage(); + ServerMessage message; - boolean durableRef = message.isDurable() && queue.durable; + try { + message = ref.getMessage(); + } + catch (Throwable e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + message = null; + } + + boolean durableRef = message != null && message.isDurable() && queue.durable; try { message.decrementRefCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 92d1a61be3..9b72f51af1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -22,7 +22,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; @@ -38,7 +37,7 @@ public class RefsOperation extends TransactionOperationAbstract { private Queue queue; List refsToAck = new ArrayList<>(); - List pagedMessagesToPostACK = null; + List pagedMessagesToPostACK = null; /** * It will ignore redelivery check, which is used during consumer.close @@ -56,13 +55,13 @@ public class RefsOperation extends TransactionOperationAbstract { ignoreRedeliveryCheck = true; } - synchronized void addAck(final MessageReference ref) throws ActiveMQException { + synchronized void addAck(final MessageReference ref) { refsToAck.add(ref); if (ref.isPaged()) { if (pagedMessagesToPostACK == null) { pagedMessagesToPostACK = new ArrayList<>(); } - pagedMessagesToPostACK.add(ref.getMessage()); + pagedMessagesToPostACK.add(ref); } } @@ -148,32 +147,26 @@ public class RefsOperation extends TransactionOperationAbstract { public void afterCommit(final Transaction tx) { for (MessageReference ref : refsToAck) { synchronized (ref.getQueue()) { - try { - queue.postAcknowledge(ref); - } - catch (ActiveMQException e) { - if (queue instanceof QueueImpl) { - ((QueueImpl) queue).criticalError(e); - } - else { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - } - } + queue.postAcknowledge(ref); } } if (pagedMessagesToPostACK != null) { - for (ServerMessage msg : pagedMessagesToPostACK) { - try { - msg.decrementRefCount(); - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - } + for (MessageReference refmsg : pagedMessagesToPostACK) { + decrementRefCount(refmsg); } } } + private void decrementRefCount(MessageReference refmsg) { + try { + refmsg.getMessage().decrementRefCount(); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + @Override public synchronized List getRelatedMessageReferences() { List listRet = new LinkedList<>(); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java new file mode 100644 index 0000000000..aa3ef3819e --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingOMETest.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.nio.ByteBuffer; +import java.util.HashMap; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class PagingOMETest extends ActiveMQTestBase { + + private ServerLocator locator; + private ActiveMQServer server; + private ClientSessionFactory sf; + static final int MESSAGE_SIZE = 1024; // 1k + + private static final int RECEIVE_TIMEOUT = 5000; + + private static final int PAGE_MAX = 100 * 1024; + + private static final int PAGE_SIZE = 10 * 1024; + + static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); + + static boolean failureActive = false; + + public static void refCheck() { + if (failureActive) { + throw new OutOfMemoryError("fake error"); + } + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + failureActive = false; + locator = createInVMNonHALocator(); + } + + @Test + @BMRules( + rules = {@BMRule( + name = "fakeOME", + targetClass = "org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl", + targetMethod = "getPagedMessage", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.PagingOMETest.refCheck()")}) + public void testPageCleanup() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultConfig(false); + + config.setJournalSyncNonTransactional(false); + + HashMap map = new HashMap<>(); + AddressSettings value = new AddressSettings(); + map.put(ADDRESS.toString(), value); + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + + server.start(); + + final int numberOfMessages = 2; + + locator = createInVMNonHALocator(); + + locator.setBlockOnNonDurableSend(true); + locator.setBlockOnDurableSend(true); + locator.setBlockOnAcknowledge(false); + locator.setConsumerWindowSize(0); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, false, false); + + session.createQueue(ADDRESS, ADDRESS, null, true); + + Queue queue = server.locateQueue(ADDRESS); + queue.getPageSubscription().getPagingStore().startPaging(); + + Assert.assertTrue(queue.getPageSubscription().getPagingStore().isPaging()); + + ClientProducer producer = session.createProducer(PagingOMETest.ADDRESS); + + ClientMessage message = null; + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + if (i % 1000 == 0) { + session.commit(); + } + } + session.commit(); + + session = sf.createSession(false, false, false); + + session.start(); + + assertEquals(numberOfMessages, queue.getMessageCount()); + + // The consumer has to be created after the queue.getMessageCount assertion + // otherwise delivery could alter the messagecount and give us a false failure + ClientConsumer consumer = session.createConsumer(PagingOMETest.ADDRESS); + ClientMessage msg = null; + + msg = consumer.receive(1000); + + failureActive = true; + msg.individualAcknowledge(); + try { + session.commit(); + Assert.fail("exception expected"); + } + catch (Exception expected) { + } + failureActive = false; + session.rollback(); + + session.close(); + + sf.close(); + + locator.close(); + + server.stop(); + + server.start(); + + locator = createInVMNonHALocator(); + + locator.setBlockOnNonDurableSend(true); + locator.setBlockOnDurableSend(true); + locator.setBlockOnAcknowledge(false); + locator.setConsumerWindowSize(0); + + sf = createSessionFactory(locator); + + session = sf.createSession(false, false, false); + + consumer = session.createConsumer(PagingOMETest.ADDRESS); + + session.start(); + + for (int i = 0; i < numberOfMessages; i++) { + msg = consumer.receive(1000); + Assert.assertNotNull(msg); + msg.individualAcknowledge(); + } + Assert.assertNull(consumer.receiveImmediate()); + session.commit(); + + session.close(); + sf.close(); + server.stop(); + + } +} diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 6244330dd6..635135739f 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -256,10 +256,6 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { return null; } - @Override - public void criticalException(Throwable e) { - } - @Override public int getNumberOfPages() { return 0;