fix AMQ-1984 - hanging consumer receive after multiple consumer disconnect/reconnect

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@707415 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-10-23 16:52:27 +00:00
parent 58b8d19a97
commit 81f0cc0d04
3 changed files with 21 additions and 20 deletions

View File

@ -104,6 +104,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
clearIterator(true);
size();
}
public synchronized void release() {
@ -166,7 +167,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void remove() {
size--;
if (size==0 && isStarted() && cacheEnabled) {
if (size==0 && isStarted() && useCache) {
cacheEnabled=true;
}
if (iterator!=null) {
@ -177,6 +178,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void remove(MessageReference node) {
size--;
cacheEnabled=false;
batchList.remove(node.getMessageId());
}
@ -234,7 +236,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
public final synchronized boolean isEmpty() {
return size <= 0;
// negative means more messages added to store through queue.send since last reset
return size == 0;
}
public final synchronized boolean hasMessagesBufferedToDeliver() {
@ -242,12 +245,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
public final synchronized int size() {
if (isStarted()) {
return size;
if (size < 0) {
this.size = getStoreSize();
}
this.size = getStoreSize();
return size;
}

View File

@ -122,18 +122,13 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
public synchronized boolean hasNext() {
boolean result = true;//pendingCount > 0;
if (result) {
try {
currentCursor = getNextCursor();
} catch (Exception e) {
LOG.error("Failed to get current cursor ", e);
throw new RuntimeException(e);
}
result = currentCursor != null ? currentCursor.hasNext() : false;
}
return result;
try {
getNextCursor();
} catch (Exception e) {
LOG.error("Failed to get current cursor ", e);
throw new RuntimeException(e);
}
return currentCursor != null ? currentCursor.hasNext() : false;
}
public synchronized MessageReference next() {
@ -160,6 +155,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
public synchronized void reset() {
nonPersistent.reset();
persistent.reset();
pendingCount = persistent.size() + nonPersistent.size();
}
public void release() {
@ -169,11 +165,15 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
public synchronized int size() {
if (pendingCount < 0) {
pendingCount = persistent.size() + nonPersistent.size();
}
return pendingCount;
}
public synchronized boolean isEmpty() {
return pendingCount <= 0;
// if negative, more messages arrived in store since last reset so non empty
return pendingCount == 0;
}
/**
@ -259,6 +259,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
if (nonPersistent != null) {
nonPersistent.gc();
}
pendingCount = persistent.size() + nonPersistent.size();
}
public void setSystemUsage(SystemUsage usageManager) {

View File

@ -61,7 +61,6 @@ public class QueueMemoryFullMultiBrokersTest extends JmsMultipleBrokersTestSuppo
// give the acks a chance to flow
Thread.sleep(2000);
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
System.err.println(internalQueue);
assertTrue("All messages are consumed and acked from source:" + internalQueue, internalQueue.getMessages().isEmpty());
assertEquals("messages source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getMessages().getCount());