https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - tidy up exception trace and async check, fix leveldb async test regressions

This commit is contained in:
gtully 2014-10-17 11:24:48 +01:00
parent 76e29bdf9c
commit 4705f95bec
1 changed files with 13 additions and 4 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region.cursors;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -249,9 +250,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
try { try {
future.get(5, TimeUnit.SECONDS); future.get(5, TimeUnit.SECONDS);
setLastCachedId(ASYNC_ADD, lastPending); setLastCachedId(ASYNC_ADD, lastPending);
} catch (CancellationException ok) {
continue;
} catch (TimeoutException potentialDeadlock) { } catch (TimeoutException potentialDeadlock) {
LOG.warn("{} timed out waiting for async add", this, potentialDeadlock); LOG.debug("{} timed out waiting for async add", this, potentialDeadlock);
} catch (Exception cancelledOrTimeOutOrErrorWorstCaseWeReplay) {cancelledOrTimeOutOrErrorWorstCaseWeReplay.printStackTrace();} } catch (Exception worstCaseWeReplay) {
LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay);
}
} else { } else {
setLastCachedId(ASYNC_ADD, lastPending); setLastCachedId(ASYNC_ADD, lastPending);
} }
@ -259,7 +264,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} }
if (lastCachedIds[ASYNC_ADD] != null) { if (lastCachedIds[ASYNC_ADD] != null) {
// ensure we don't skip current possibly sync add b/c we waited on the future // ensure we don't skip current possibly sync add b/c we waited on the future
if (currentAdd.isRecievedByDFBridge() || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) { if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
setBatch(lastCachedIds[ASYNC_ADD]); setBatch(lastCachedIds[ASYNC_ADD]);
} }
} }
@ -272,7 +277,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} }
private void trackLastCached(MessageReference node) { private void trackLastCached(MessageReference node) {
if (node.getMessageId().getFutureOrSequenceLong() instanceof Future || node.getMessage().isRecievedByDFBridge()) { if (isAsync(node.getMessage())) {
pruneLastCached(); pruneLastCached();
pendingCachedIds.add(node.getMessageId()); pendingCachedIds.add(node.getMessageId());
} else { } else {
@ -280,6 +285,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} }
} }
private static final boolean isAsync(Message message) {
return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future;
}
private void pruneLastCached() { private void pruneLastCached() {
for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
MessageId candidate = it.next(); MessageId candidate = it.next();