isolate cursor storeHasMessage logic into durable topic sub cursor b/c only durable sub cursors have selectors that won't match, otherwise we should always read a page if the store has messages

This commit is contained in:
gtully 2014-10-21 23:21:45 +01:00
parent 74d2c2425f
commit 8b8f630080
2 changed files with 21 additions and 16 deletions

View File

@ -42,7 +42,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected final PendingList batchList;
private Iterator<MessageReference> iterator = null;
protected boolean batchResetNeeded = false;
private boolean storeHasMessages = false;
protected int size;
private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
private static int SYNC_ADD = 0;
@ -66,13 +65,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
super.start();
resetBatch();
resetSize();
setCacheEnabled(!this.storeHasMessages&&useCache);
setCacheEnabled(size==0&&useCache);
}
}
protected void resetSize() {
this.size = getStoreSize();
this.storeHasMessages=this.size > 0;
}
@Override
@ -93,7 +91,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
boolean recovered = false;
storeHasMessages = true;
if (recordUniqueId(message.getMessageId())) {
if (!cached) {
message.setRegionDestination(regionDestination);
@ -202,7 +199,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return result;
}
public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
public synchronized boolean addMessageLast(MessageReference node) throws Exception {
boolean disableCache = false;
if (hasSpace()) {
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
@ -230,7 +227,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
syncWithStore(node.getMessage());
setCacheEnabled(false);
}
this.storeHasMessages = true;
size++;
return true;
}
@ -380,18 +376,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetBatch();
this.batchResetNeeded = false;
}
if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
// avoid repeated trips to the store if there is nothing of interest
this.storeHasMessages = false;
if (this.batchList.isEmpty() && this.size >0) {
try {
doFillBatch();
} catch (Exception e) {
LOG.error("{} - Failed to fill batch", this, e);
throw new RuntimeException(e);
}
if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
this.storeHasMessages = true;
}
}
}
@ -417,7 +408,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
@Override
public String toString() {
return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+ ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
+ ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
+ ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size()
+ ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
+ ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");

View File

@ -40,6 +40,8 @@ class TopicStorePrefetch extends AbstractStoreCursor {
private final String subscriberName;
private final Subscription subscription;
private byte lastRecoveredPriority = 9;
private boolean storeHasMessages = false;
/**
* @param topic
* @param clientId
@ -54,6 +56,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
resetSize();
this.storeHasMessages=this.size > 0;
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
@ -65,8 +68,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
batchList.addMessageFirst(node);
size++;
}
@Override
public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
this.storeHasMessages = super.addMessageLast(node);
return this.storeHasMessages;
}
@Override
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
LOG.trace("{} recover: {}, priority: {}", this, message.getMessageId(), message.getPriority());
@ -78,6 +86,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
if (recovered && !cached) {
lastRecoveredPriority = message.getPriority();
}
storeHasMessages = true;
}
return recovered;
}
@ -110,8 +119,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
protected void doFillBatch() throws Exception {
// avoid repeated trips to the store if there is nothing of interest
this.storeHasMessages = false;
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
this.storeHasMessages = true;
}
}
public byte getLastRecoveredPriority() {
@ -129,6 +143,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
public String toString() {
return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
}
}