fix reference counting

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@578877 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-09-24 17:00:08 +00:00
parent 01bdc524e7
commit 2b063efafa
1 changed files with 15 additions and 6 deletions

View File

@ -28,11 +28,9 @@ import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener; import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* persist pending messages pending message (messages awaiting dispatch to a * persist pending messages pending message (messages awaiting dispatch to a
@ -42,7 +40,6 @@ import org.apache.commons.logging.LogFactory;
*/ */
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
private static final AtomicLong NAME_COUNT = new AtomicLong(); private static final AtomicLong NAME_COUNT = new AtomicLong();
private Store store; private Store store;
@ -54,6 +51,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private boolean iterating; private boolean iterating;
private boolean flushRequired; private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean(); private AtomicBoolean started = new AtomicBoolean();
private MessageReference last = null;
/** /**
* @param name * @param name
@ -85,7 +83,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
public synchronized boolean isEmpty() { public synchronized boolean isEmpty() {
return memoryList.isEmpty() && isDiskListEmpty(); boolean result = memoryList.isEmpty() && isDiskListEmpty();
return result;
} }
/** /**
@ -93,6 +92,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*/ */
public synchronized void reset() { public synchronized void reset() {
iterating = true; iterating = true;
last = null;
iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator(); iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
} }
@ -145,6 +145,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
regionDestination = node.getMessage().getRegionDestination(); regionDestination = node.getMessage().getRegionDestination();
if (isSpaceInMemoryList()) { if (isSpaceInMemoryList()) {
memoryList.add(node); memoryList.add(node);
node.incrementReferenceCount();
} else { } else {
flushToDisk(); flushToDisk();
node.decrementReferenceCount(); node.decrementReferenceCount();
@ -166,6 +167,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
regionDestination = node.getMessage().getRegionDestination(); regionDestination = node.getMessage().getRegionDestination();
if (isSpaceInMemoryList()) { if (isSpaceInMemoryList()) {
memoryList.addFirst(node); memoryList.addFirst(node);
node.incrementReferenceCount();
} else { } else {
flushToDisk(); flushToDisk();
systemUsage.getTempUsage().waitForSpace(); systemUsage.getTempUsage().waitForSpace();
@ -189,6 +191,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*/ */
public synchronized MessageReference next() { public synchronized MessageReference next() {
Message message = (Message)iter.next(); Message message = (Message)iter.next();
last = message;
if (!isDiskListEmpty()) { if (!isDiskListEmpty()) {
// got from disk // got from disk
message.setRegionDestination(regionDestination); message.setRegionDestination(regionDestination);
@ -202,6 +205,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*/ */
public synchronized void remove() { public synchronized void remove() {
iter.remove(); iter.remove();
if (last != null) {
last.decrementReferenceCount();
}
} }
/** /**
@ -209,7 +215,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/ */
public synchronized void remove(MessageReference node) { public synchronized void remove(MessageReference node) {
memoryList.remove(node); if (memoryList.remove(node)) {
node.decrementReferenceCount();
}
if (!isDiskListEmpty()) { if (!isDiskListEmpty()) {
getDiskList().remove(node); getDiskList().remove(node);
} }
@ -230,6 +238,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (!isDiskListEmpty()) { if (!isDiskListEmpty()) {
getDiskList().clear(); getDiskList().clear();
} }
last=null;
} }
public synchronized boolean isFull() { public synchronized boolean isFull() {