Added support for hasMessagesToDeliver() method

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-28 20:52:41 +00:00
parent 4a16f4527a
commit e5efc58cd4
5 changed files with 59 additions and 10 deletions

View File

@ -122,4 +122,8 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
public void release(){
}
public boolean hasMessagesBufferedToDeliver() {
return false;
}
}

View File

@ -213,6 +213,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
// we always have space - as we can persist to disk
return false;
}
public boolean hasMessagesBufferedToDeliver() {
return !isEmpty();
}
public void setUsageManager(UsageManager usageManager){
super.setUsageManager(usageManager);

View File

@ -166,4 +166,9 @@ public interface PendingMessageCursor extends Service{
* @return true if the cursor is full
*/
public boolean isFull();
/**
* @return true if the cursor has buffered messages ready to deliver
*/
public boolean hasMessagesBufferedToDeliver();
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.commons.logging.Log;
@ -43,6 +44,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
private MessageStore store;
private final LinkedList batchList=new LinkedList();
private Destination regionDestination;
private int size = 0;
/**
* @param topic
@ -68,26 +70,48 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
* @return true if there are no pending messages
*/
public boolean isEmpty(){
return batchList.isEmpty();
return size <= 0;
}
public boolean hasMessagesBufferedToDeliver() {
return !batchList.isEmpty();
}
public synchronized int size(){
try {
return store.getMessageCount();
size = store.getMessageCount();
}catch(IOException e) {
log.error("Failed to get message count",e);
throw new RuntimeException(e);
}
return size;
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
node.decrementReferenceCount();
}
size++;
}
public void addMessageFirst(MessageReference node) throws Exception{
if(node!=null){
node.decrementReferenceCount();
}
size++;
}
public void remove(){
size--;
}
public void remove(MessageReference node){
size--;
}
public synchronized boolean hasNext(){
if(isEmpty()){
if(batchList.isEmpty()){
try{
fillBatch();
}catch(Exception e){
@ -95,7 +119,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
throw new RuntimeException(e);
}
}
return !isEmpty();
return !batchList.isEmpty();
}
public synchronized MessageReference next(){
@ -117,10 +141,15 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
batchList.addLast(message);
}
public void recoverMessageReference(String messageReference)
throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
public void recoverMessageReference(String messageReference) throws Exception{
Message msg=store.getMessage(new MessageId(messageReference));
if(msg!=null){
recoverMessage(msg);
}else{
String err = "Failed to retrieve message for id: "+messageReference;
log.error(err);
throw new IOException(err);
}
}
public void gc() {

View File

@ -37,6 +37,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
private QueueStorePrefetch persistent;
private boolean started;
private PendingMessageCursor currentCursor;
/**
* Construct
@ -48,6 +49,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
this.queue=queue;
this.tmpStore=tmpStore;
this.persistent=new QueueStorePrefetch(queue);
currentCursor = persistent;
}
public synchronized void start() throws Exception{
@ -134,7 +136,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
pendingCount--;
}
public void remove(MessageReference node){
public synchronized void remove(MessageReference node){
if (!node.isPersistent()) {
nonPersistent.remove(node);
}else {
@ -145,6 +147,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
public synchronized void reset(){
nonPersistent.reset();
persistent.reset();
}
public int size(){
@ -208,8 +211,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{
}
protected synchronized PendingMessageCursor getNextCursor() throws Exception{
if(currentCursor==null||currentCursor.isEmpty()){
if(currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()){
currentCursor=currentCursor==persistent?nonPersistent:persistent;
//sanity check
if (currentCursor.isEmpty()) {
currentCursor=currentCursor==persistent?nonPersistent:persistent;
}
}
return currentCursor;
}