ARTEMIS-332 - test added / better dealing with critical errors on paging

This commit is contained in:
Clebert Suconic 2016-01-06 15:45:19 -05:00
parent d4dd070230
commit a5a993ed9d
15 changed files with 321 additions and 226 deletions
artemis-server/src/main/java/org/apache/activemq/artemis/core
tests
extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman
performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage

View File

@ -47,8 +47,6 @@ public interface PagingStore extends ActiveMQComponent {
int getNumberOfPages(); int getNumberOfPages();
void criticalException(Throwable e);
/** /**
* Returns the page id of the current page in which the system is writing files. * Returns the page id of the current page in which the system is writing files.
*/ */

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.paging.cursor; package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -25,7 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
*/ */
public interface PageCursorProvider { public interface PageCursorProvider {
PageCache getPageCache(long pageNr) throws ActiveMQException; PageCache getPageCache(long pageNr);
PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub); PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);
@ -39,7 +38,7 @@ public interface PageCursorProvider {
PageSubscription createSubscription(long queueId, Filter filter, boolean durable); PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
PagedMessage getMessage(PagePosition pos) throws ActiveMQException; PagedMessage getMessage(PagePosition pos);
void processReload() throws Exception; void processReload() throws Exception;

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.paging.cursor;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
@ -96,7 +95,7 @@ public interface PageSubscription {
void reloadPageCompletion(PagePosition position); void reloadPageCompletion(PagePosition position);
void reloadPageInfo(long pageNr) throws ActiveMQException; void reloadPageInfo(long pageNr);
/** /**
* To be called when the cursor decided to ignore a position. * To be called when the cursor decided to ignore a position.
@ -148,7 +147,7 @@ public interface PageSubscription {
* @param pos * @param pos
* @return * @return
*/ */
PagedMessage queryMessage(PagePosition pos) throws ActiveMQException; PagedMessage queryMessage(PagePosition pos);
/** /**
* @return executor used by the PageSubscription * @return executor used by the PageSubscription

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.paging.cursor; package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -24,5 +23,5 @@ public interface PagedReference extends MessageReference {
PagePosition getPosition(); PagePosition getPosition();
PagedMessage getPagedMessage() throws ActiveMQException; PagedMessage getPagedMessage();
} }

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.paging.cursor;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -50,12 +49,12 @@ public class PagedReferenceImpl implements PagedReference {
private boolean alreadyAcked; private boolean alreadyAcked;
@Override @Override
public ServerMessage getMessage() throws ActiveMQException { public ServerMessage getMessage() {
return getPagedMessage().getMessage(); return getPagedMessage().getMessage();
} }
@Override @Override
public synchronized PagedMessage getPagedMessage() throws ActiveMQException { public synchronized PagedMessage getPagedMessage() {
PagedMessage returnMessage = message != null ? message.get() : null; PagedMessage returnMessage = message != null ? message.get() : null;
// We only keep a few references on the Queue from paging... // We only keep a few references on the Queue from paging...
@ -111,7 +110,7 @@ public class PagedReferenceImpl implements PagedReference {
try { try {
messageEstimate = getMessage().getMemoryEstimate(); messageEstimate = getMessage().getMemoryEstimate();
} }
catch (ActiveMQException e) { catch (Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
} }
} }
@ -120,14 +119,8 @@ public class PagedReferenceImpl implements PagedReference {
@Override @Override
public MessageReference copy(final Queue queue) { public MessageReference copy(final Queue queue) {
try {
return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription); return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
} }
catch (ActiveMQException e) {
ActiveMQServerLogger.LOGGER.warn(e);
return this;
}
}
@Override @Override
public long getScheduledDeliveryTime() { public long getScheduledDeliveryTime() {
@ -141,7 +134,7 @@ public class PagedReferenceImpl implements PagedReference {
deliveryTime = 0L; deliveryTime = 0L;
} }
} }
catch (ActiveMQException e) { catch (Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
return 0L; return 0L;
} }

View File

@ -24,8 +24,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
@ -111,7 +109,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
@Override @Override
public PagedMessage getMessage(final PagePosition pos) throws ActiveMQException { public PagedMessage getMessage(final PagePosition pos) {
PageCache cache = getPageCache(pos.getPageNr()); PageCache cache = getPageCache(pos.getPageNr());
if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) { if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) {
@ -130,7 +128,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} }
@Override @Override
public PageCache getPageCache(final long pageId) throws ActiveMQException { public PageCache getPageCache(final long pageId) {
try { try {
PageCache cache; PageCache cache;
synchronized (softCache) { synchronized (softCache) {
@ -157,8 +155,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
return cache; return cache;
} }
catch (Throwable e) { catch (Exception e) {
throw new ActiveMQIOErrorException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e); throw new RuntimeException(e.getMessage(), e);
} }
} }

View File

@ -333,7 +333,7 @@ final class PageSubscriptionImpl implements PageSubscription {
return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]"; return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]";
} }
private PagedReference getReference(PagePosition pos) throws ActiveMQException { private PagedReference getReference(PagePosition pos) {
return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this); return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this);
} }
@ -342,7 +342,7 @@ final class PageSubscriptionImpl implements PageSubscription {
return new CursorIterator(); return new CursorIterator();
} }
private PagedReference internalGetNext(final PagePosition pos) throws ActiveMQException { private PagedReference internalGetNext(final PagePosition pos) {
PagePosition retPos = pos.nextMessage(); PagePosition retPos = pos.nextMessage();
PageCache cache = cursorProvider.getPageCache(pos.getPageNr()); PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
@ -471,18 +471,12 @@ final class PageSubscriptionImpl implements PageSubscription {
public void onError(final int errorCode, final String errorMessage) { public void onError(final int errorCode, final String errorMessage) {
error = " errorCode=" + errorCode + ", msg=" + errorMessage; error = " errorCode=" + errorCode + ", msg=" + errorMessage;
ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error); ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error);
getPagingStore().criticalException(new ActiveMQException(errorMessage));
} }
@Override @Override
public void done() { public void done() {
try {
processACK(position); processACK(position);
} }
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
}
}
@Override @Override
public String toString() { public String toString() {
@ -511,11 +505,10 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override @Override
public void addPendingDelivery(final PagePosition position) { public void addPendingDelivery(final PagePosition position) {
try { PageCursorInfo info = getPageInfo(position);
getPageInfo(position).incrementPendingTX();
} if (info != null) {
catch (Exception e) { info.incrementPendingTX();
getPagingStore().criticalException(e);
} }
} }
@ -535,7 +528,7 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
@Override @Override
public PagedMessage queryMessage(PagePosition pos) throws ActiveMQException { public PagedMessage queryMessage(PagePosition pos) {
return cursorProvider.getMessage(pos); return cursorProvider.getMessage(pos);
} }
@ -554,33 +547,18 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override @Override
public void reloadPreparedACK(final Transaction tx, final PagePosition position) { public void reloadPreparedACK(final Transaction tx, final PagePosition position) {
deliveredCount.incrementAndGet(); deliveredCount.incrementAndGet();
try {
installTXCallback(tx, position); installTXCallback(tx, position);
} }
catch (Exception e) {
getPagingStore().criticalException(e);
}
}
@Override @Override
public void positionIgnored(final PagePosition position) { public void positionIgnored(final PagePosition position) {
try {
processACK(position); processACK(position);
} }
catch (Exception e) {
getPagingStore().criticalException(e);
}
}
public void lateDeliveryRollback(PagePosition position) { public void lateDeliveryRollback(PagePosition position) {
try {
PageCursorInfo cursorInfo = processACK(position); PageCursorInfo cursorInfo = processACK(position);
cursorInfo.decrementPendingTX(); cursorInfo.decrementPendingTX();
} }
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
}
}
@Override @Override
public boolean isComplete(long page) { public boolean isComplete(long page) {
@ -750,15 +728,15 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
@Override @Override
public void reloadPageInfo(long pageNr) throws ActiveMQException { public void reloadPageInfo(long pageNr) {
getPageInfo(pageNr, true); getPageInfo(pageNr, true);
} }
private PageCursorInfo getPageInfo(final PagePosition pos) throws ActiveMQException { private PageCursorInfo getPageInfo(final PagePosition pos) {
return getPageInfo(pos.getPageNr(), true); return getPageInfo(pos.getPageNr(), true);
} }
private PageCursorInfo getPageInfo(final long pageNr, boolean create) throws ActiveMQException { private PageCursorInfo getPageInfo(final long pageNr, boolean create) {
synchronized (consumedPages) { synchronized (consumedPages) {
PageCursorInfo pageInfo = consumedPages.get(pageNr); PageCursorInfo pageInfo = consumedPages.get(pageNr);
@ -792,7 +770,7 @@ final class PageSubscriptionImpl implements PageSubscription {
// To be called only after the ACK has been processed and guaranteed to be on storage // To be called only after the ACK has been processed and guaranteed to be on storage
// The only exception is on non storage events such as not matching messages // The only exception is on non storage events such as not matching messages
private PageCursorInfo processACK(final PagePosition pos) throws ActiveMQException { private PageCursorInfo processACK(final PagePosition pos) {
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) { if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) {
if (isTrace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK"); ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK");
@ -828,7 +806,7 @@ final class PageSubscriptionImpl implements PageSubscription {
* @param tx * @param tx
* @param position * @param position
*/ */
private void installTXCallback(final Transaction tx, final PagePosition position) throws ActiveMQException { private void installTXCallback(final Transaction tx, final PagePosition position) {
if (position.getRecordID() >= 0) { if (position.getRecordID() >= 0) {
// It needs to persist, otherwise the cursor will return to the fist page position // It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent(); tx.setContainsPersistent();
@ -960,14 +938,8 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
public boolean isDone() { public boolean isDone() {
try {
return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0); return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
} }
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
throw new RuntimeException(e.getMessage(), e);
}
}
public boolean isPendingDelete() { public boolean isPendingDelete() {
return pendingDelete || completePage != null; return pendingDelete || completePage != null;
@ -1047,7 +1019,7 @@ final class PageSubscriptionImpl implements PageSubscription {
} }
} }
private int getNumberOfMessages() throws ActiveMQException { private int getNumberOfMessages() {
if (wasLive) { if (wasLive) {
// if the page was live at any point, we need to // if the page was live at any point, we need to
// get the number of messages from the page-cache // get the number of messages from the page-cache
@ -1089,12 +1061,7 @@ final class PageSubscriptionImpl implements PageSubscription {
List<PagePosition> positions = entry.getValue(); List<PagePosition> positions = entry.getValue();
for (PagePosition confirmed : positions) { for (PagePosition confirmed : positions) {
try {
cursor.processACK(confirmed); cursor.processACK(confirmed);
}
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
}
cursor.deliveredCount.decrementAndGet(); cursor.deliveredCount.decrementAndGet();
} }
@ -1165,7 +1132,6 @@ final class PageSubscriptionImpl implements PageSubscription {
return currentDelivery; return currentDelivery;
} }
try {
if (position == null) { if (position == null) {
position = getStartPosition(); position = getStartPosition();
} }
@ -1173,13 +1139,8 @@ final class PageSubscriptionImpl implements PageSubscription {
currentDelivery = moveNext(); currentDelivery = moveNext();
return currentDelivery; return currentDelivery;
} }
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
throw new IllegalStateException(e.getMessage(), e);
}
}
private PagedReference moveNext() throws ActiveMQException { private PagedReference moveNext() {
synchronized (PageSubscriptionImpl.this) { synchronized (PageSubscriptionImpl.this) {
boolean match = false; boolean match = false;
@ -1309,16 +1270,11 @@ final class PageSubscriptionImpl implements PageSubscription {
deliveredCount.incrementAndGet(); deliveredCount.incrementAndGet();
PagedReference delivery = currentDelivery; PagedReference delivery = currentDelivery;
if (delivery != null) { if (delivery != null) {
try {
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition()); PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition());
if (info != null) { if (info != null) {
info.remove(currentDelivery.getPosition()); info.remove(currentDelivery.getPosition());
} }
} }
catch (ActiveMQException e) {
getPagingStore().criticalException(e);
}
}
} }
@Override @Override

View File

@ -176,12 +176,6 @@ public class PagingStoreImpl implements PagingStore {
} }
@Override
public void criticalException(Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
storeFactory.criticalException(e);
}
/** /**
* @param addressSettings * @param addressSettings
*/ */

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQException;
/** /**
* A reference to a message. * A reference to a message.
* *
@ -27,7 +25,7 @@ public interface MessageReference {
boolean isPaged(); boolean isPaged();
ServerMessage getMessage() throws ActiveMQException; ServerMessage getMessage();
/** /**
* We define this method aggregation here because on paging we need to hold the original estimate, * We define this method aggregation here because on paging we need to hold the original estimate,

View File

@ -239,7 +239,7 @@ public interface Queue extends Bindable {
*/ */
void deliverScheduledMessages() throws ActiveMQException; void deliverScheduledMessages() throws ActiveMQException;
void postAcknowledge(MessageReference ref) throws ActiveMQException; void postAcknowledge(MessageReference ref);
float getRate(); float getRate();

View File

@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
@ -67,15 +66,7 @@ public class LastValueQueue extends QueueImpl {
@Override @Override
public synchronized void addTail(final MessageReference ref, final boolean direct) { public synchronized void addTail(final MessageReference ref, final boolean direct) {
SimpleString prop; SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
try {
prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
}
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
if (prop != null) { if (prop != null) {
HolderReference hr = map.get(prop); HolderReference hr = map.get(prop);
@ -112,7 +103,6 @@ public class LastValueQueue extends QueueImpl {
@Override @Override
public synchronized void addHead(final MessageReference ref) { public synchronized void addHead(final MessageReference ref) {
try {
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
if (prop != null) { if (prop != null) {
@ -140,16 +130,9 @@ public class LastValueQueue extends QueueImpl {
super.addHead(ref); super.addHead(ref);
} }
} }
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
}
@Override @Override
protected void refRemoved(MessageReference ref) { protected void refRemoved(MessageReference ref) {
try {
synchronized (this) { synchronized (this) {
SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
@ -160,12 +143,6 @@ public class LastValueQueue extends QueueImpl {
super.refRemoved(ref); super.refRemoved(ref);
} }
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
}
private class HolderReference implements MessageReference { private class HolderReference implements MessageReference {
@ -223,14 +200,8 @@ public class LastValueQueue extends QueueImpl {
@Override @Override
public ServerMessage getMessage() { public ServerMessage getMessage() {
try {
return ref.getMessage(); return ref.getMessage();
} }
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
}
@Override @Override
public Queue getQueue() { public Queue getQueue() {
@ -285,14 +256,8 @@ public class LastValueQueue extends QueueImpl {
*/ */
@Override @Override
public int getMessageMemoryEstimate() { public int getMessageMemoryEstimate() {
try {
return ref.getMessage().getMemoryEstimate(); return ref.getMessage().getMemoryEstimate();
} }
catch (ActiveMQException e) {
criticalError(e);
throw new IllegalStateException(e);
}
}
/* (non-Javadoc) /* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long) * @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long)

View File

@ -1054,14 +1054,8 @@ public class QueueImpl implements Queue {
@Override @Override
public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) { public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) {
try {
getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference); getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
} }
catch (ActiveMQException e) {
criticalError(e);
getPageSubscription().getPagingStore().criticalException(e);
}
}
@Override @Override
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception { public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
@ -1777,12 +1771,7 @@ public class QueueImpl implements Queue {
private synchronized void internalAddTail(final MessageReference ref) { private synchronized void internalAddTail(final MessageReference ref) {
refAdded(ref); refAdded(ref);
try { messageReferences.addTail(ref, getPriority(ref));
messageReferences.addTail(ref, ref.getMessage().getPriority());
}
catch (ActiveMQException e) {
criticalError(e);
}
} }
/** /**
@ -1793,18 +1782,22 @@ public class QueueImpl implements Queue {
* @param ref * @param ref
*/ */
private void internalAddHead(final MessageReference ref) { private void internalAddHead(final MessageReference ref) {
try {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
refAdded(ref); refAdded(ref);
messageReferences.addHead(ref, ref.getMessage().getPriority());
} int priority = getPriority(ref);
catch (ActiveMQException e) {
criticalError(e); messageReferences.addHead(ref, priority);
}
} }
void criticalError(ActiveMQException e) { private int getPriority(MessageReference ref) {
storageManager.criticalError(e); try {
return ref.getMessage().getPriority();
}
catch (Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
return 4; // the default one in case of failure
}
} }
private synchronized void doInternalPoll() { private synchronized void doInternalPoll() {
@ -2036,9 +2029,9 @@ public class QueueImpl implements Queue {
// But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID); return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
} }
catch (ActiveMQException e) { catch (Throwable e) {
criticalError(e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
throw new IllegalStateException(e); return null;
} }
} }
} }
@ -2543,9 +2536,9 @@ public class QueueImpl implements Queue {
return false; return false;
} }
} }
catch (ActiveMQException e) { catch (Throwable e) {
criticalError(e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
throw new IllegalStateException(e); return false;
} }
} }
@ -2584,7 +2577,7 @@ public class QueueImpl implements Queue {
} }
@Override @Override
public void postAcknowledge(final MessageReference ref) throws ActiveMQException { public void postAcknowledge(final MessageReference ref) {
QueueImpl queue = (QueueImpl) ref.getQueue(); QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering(); queue.decDelivering();
@ -2594,9 +2587,17 @@ public class QueueImpl implements Queue {
return; return;
} }
final ServerMessage message = ref.getMessage(); ServerMessage message;
boolean durableRef = message.isDurable() && queue.durable; try {
message = ref.getMessage();
}
catch (Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
message = null;
}
boolean durableRef = message != null && message.isDurable() && queue.durable;
try { try {
message.decrementRefCount(); message.decrementRefCount();

View File

@ -22,7 +22,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -38,7 +37,7 @@ public class RefsOperation extends TransactionOperationAbstract {
private Queue queue; private Queue queue;
List<MessageReference> refsToAck = new ArrayList<>(); List<MessageReference> refsToAck = new ArrayList<>();
List<ServerMessage> pagedMessagesToPostACK = null; List<MessageReference> pagedMessagesToPostACK = null;
/** /**
* It will ignore redelivery check, which is used during consumer.close * It will ignore redelivery check, which is used during consumer.close
@ -56,13 +55,13 @@ public class RefsOperation extends TransactionOperationAbstract {
ignoreRedeliveryCheck = true; ignoreRedeliveryCheck = true;
} }
synchronized void addAck(final MessageReference ref) throws ActiveMQException { synchronized void addAck(final MessageReference ref) {
refsToAck.add(ref); refsToAck.add(ref);
if (ref.isPaged()) { if (ref.isPaged()) {
if (pagedMessagesToPostACK == null) { if (pagedMessagesToPostACK == null) {
pagedMessagesToPostACK = new ArrayList<>(); pagedMessagesToPostACK = new ArrayList<>();
} }
pagedMessagesToPostACK.add(ref.getMessage()); pagedMessagesToPostACK.add(ref);
} }
} }
@ -148,31 +147,25 @@ public class RefsOperation extends TransactionOperationAbstract {
public void afterCommit(final Transaction tx) { public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) { for (MessageReference ref : refsToAck) {
synchronized (ref.getQueue()) { synchronized (ref.getQueue()) {
try {
queue.postAcknowledge(ref); queue.postAcknowledge(ref);
} }
catch (ActiveMQException e) {
if (queue instanceof QueueImpl) {
((QueueImpl) queue).criticalError(e);
}
else {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
}
} }
if (pagedMessagesToPostACK != null) { if (pagedMessagesToPostACK != null) {
for (ServerMessage msg : pagedMessagesToPostACK) { for (MessageReference refmsg : pagedMessagesToPostACK) {
decrementRefCount(refmsg);
}
}
}
private void decrementRefCount(MessageReference refmsg) {
try { try {
msg.decrementRefCount(); refmsg.getMessage().decrementRefCount();
} }
catch (Exception e) { catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
} }
} }
}
}
@Override @Override
public synchronized List<MessageReference> getRelatedMessageReferences() { public synchronized List<MessageReference> getRelatedMessageReferences() {

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.nio.ByteBuffer;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class PagingOMETest extends ActiveMQTestBase {
private ServerLocator locator;
private ActiveMQServer server;
private ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
private static final int RECEIVE_TIMEOUT = 5000;
private static final int PAGE_MAX = 100 * 1024;
private static final int PAGE_SIZE = 10 * 1024;
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
static boolean failureActive = false;
public static void refCheck() {
if (failureActive) {
throw new OutOfMemoryError("fake error");
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
failureActive = false;
locator = createInVMNonHALocator();
}
@Test
@BMRules(
rules = {@BMRule(
name = "fakeOME",
targetClass = "org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl",
targetMethod = "getPagedMessage",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.PagingOMETest.refCheck()")})
public void testPageCleanup() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultConfig(false);
config.setJournalSyncNonTransactional(false);
HashMap<String, AddressSettings> map = new HashMap<>();
AddressSettings value = new AddressSettings();
map.put(ADDRESS.toString(), value);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfMessages = 2;
locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(false);
locator.setConsumerWindowSize(0);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, null, true);
Queue queue = server.locateQueue(ADDRESS);
queue.getPageSubscription().getPagingStore().startPaging();
Assert.assertTrue(queue.getPageSubscription().getPagingStore().isPaging());
ClientProducer producer = session.createProducer(PagingOMETest.ADDRESS);
ClientMessage message = null;
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
session = sf.createSession(false, false, false);
session.start();
assertEquals(numberOfMessages, queue.getMessageCount());
// The consumer has to be created after the queue.getMessageCount assertion
// otherwise delivery could alter the messagecount and give us a false failure
ClientConsumer consumer = session.createConsumer(PagingOMETest.ADDRESS);
ClientMessage msg = null;
msg = consumer.receive(1000);
failureActive = true;
msg.individualAcknowledge();
try {
session.commit();
Assert.fail("exception expected");
}
catch (Exception expected) {
}
failureActive = false;
session.rollback();
session.close();
sf.close();
locator.close();
server.stop();
server.start();
locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(false);
locator.setConsumerWindowSize(0);
sf = createSessionFactory(locator);
session = sf.createSession(false, false, false);
consumer = session.createConsumer(PagingOMETest.ADDRESS);
session.start();
for (int i = 0; i < numberOfMessages; i++) {
msg = consumer.receive(1000);
Assert.assertNotNull(msg);
msg.individualAcknowledge();
}
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
sf.close();
server.stop();
}
}

View File

@ -256,10 +256,6 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
return null; return null;
} }
@Override
public void criticalException(Throwable e) {
}
@Override @Override
public int getNumberOfPages() { public int getNumberOfPages() {
return 0; return 0;