git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@703975 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-10-13 08:57:07 +00:00
parent db0d5cd363
commit d261412058
5 changed files with 23 additions and 48 deletions

View File

@ -246,14 +246,6 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
return false; return false;
} }
/**
* Mark a message as already dispatched
* @param message
*/
public void dispatched(MessageReference message) {
//add it to the audit
isDuplicate(message.getMessageId());
}
/** /**
* set the audit * set the audit

View File

@ -37,11 +37,11 @@ import org.apache.commons.logging.LogFactory;
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener { public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class); private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
protected final Destination regionDestination; protected final Destination regionDestination;
protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> (); private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
private Iterator<Entry<MessageId, Message>> iterator = null;
protected boolean cacheEnabled=false; protected boolean cacheEnabled=false;
protected boolean batchResetNeeded = true; protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false; protected boolean storeHasMessages = false;
protected Iterator<Entry<MessageId, Message>> iterator = null;
protected int size; protected int size;
protected AbstractStoreCursor(Destination destination) { protected AbstractStoreCursor(Destination destination) {
@ -84,6 +84,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} }
message.incrementReferenceCount(); message.incrementReferenceCount();
batchList.put(message.getMessageId(), message); batchList.put(message.getMessageId(), message);
clearIterator(true);
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring batched duplicated from store: " + message); LOG.debug("Ignoring batched duplicated from store: " + message);
@ -102,11 +103,25 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
this.iterator = this.batchList.entrySet().iterator(); clearIterator(true);
} }
public void release() { public synchronized void release() {
clearIterator(false);
}
private synchronized void clearIterator(boolean ensureIterator) {
boolean haveIterator = this.iterator != null;
this.iterator=null; this.iterator=null;
if(haveIterator&&ensureIterator) {
ensureIterator();
}
}
private synchronized void ensureIterator() {
if(this.iterator==null) {
this.iterator=this.batchList.entrySet().iterator();
}
} }
@ -117,16 +132,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (batchList.isEmpty()) { if (batchList.isEmpty()) {
try { try {
fillBatch(); fillBatch();
this.iterator = this.batchList.entrySet().iterator();
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to fill batch", e); LOG.error("Failed to fill batch", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}else {
if (this.iterator==null) {
this.iterator=this.batchList.entrySet().iterator();
}
} }
ensureIterator();
return this.iterator.hasNext(); return this.iterator.hasNext();
} }
@ -192,6 +203,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
msg.decrementReferenceCount(); msg.decrementReferenceCount();
} }
batchList.clear(); batchList.clear();
clearIterator(false);
batchResetNeeded = true; batchResetNeeded = true;
this.cacheEnabled=false; this.cacheEnabled=false;
if (isStarted()) { if (isStarted()) {

View File

@ -255,11 +255,6 @@ public interface PendingMessageCursor extends Service {
*/ */
public boolean isTransient(); public boolean isTransient();
/**
* Mark a message as already dispatched
* @param message
*/
public void dispatched(MessageReference message);
/** /**
* set the audit * set the audit

View File

@ -300,17 +300,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
} }
} }
/**
* Mark a message as already dispatched
* @param message
*/
public synchronized void dispatched(MessageReference message) {
super.dispatched(message);
for (PendingMessageCursor cursor : storePrefetches) {
cursor.dispatched(message);
}
}
protected synchronized PendingMessageCursor getNextCursor() throws Exception { protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) { if (currentCursor == null || currentCursor.isEmpty()) {
currentCursor = null; currentCursor = null;

View File

@ -64,19 +64,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
throw new RuntimeException("Not supported"); throw new RuntimeException("Not supported");
} }
/**
* Mark a message as already dispatched
* @param message
*/
public synchronized void dispatched(MessageReference message) {
if (this.audit != null) {
isDuplicate(message.getMessageId());
Message removed = this.batchList.remove(message.getMessageId());
if (removed != null) {
removed.decrementReferenceCount();
}
}
}
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext(); MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();