tidied up the way messages are page in for a durable subscriber

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@512640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-28 07:23:37 +00:00
parent 14c605feab
commit dc0241343a
2 changed files with 89 additions and 32 deletions

View File

@ -168,6 +168,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
public void clear(){
pendingCount=0;
nonPersistent.clear();
for(PendingMessageCursor tsp: storePrefetches){
tsp.clear();
}
}
public synchronized boolean hasNext(){

View File

@ -27,7 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
* perist pendingCount messages pendingCount message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
@ -39,15 +39,15 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
private String clientId;
private String subscriberName;
private Destination regionDestination;
boolean empty;
private MessageId firstMessageId;
private MessageId lastMessageId;
private int pendingCount;
private boolean started;
/**
* @param topic
* @param clientId
* @param subscriberName
* @throws IOException
*/
public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){
this.regionDestination=topic;
@ -56,57 +56,78 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
this.subscriberName=subscriberName;
}
public synchronized void start() throws Exception{
if(batchList.isEmpty()){
public synchronized void start(){
if(!started){
started=true;
pendingCount=getStoreSize();
try{
fillBatch();
}catch(Exception e){
log.error("Failed to fill batch",e);
throw new RuntimeException(e);
}
empty=batchList.isEmpty();
}
}
public synchronized void stop() throws Exception{
public synchronized void stop(){
if(started){
started=false;
store.resetBatching(clientId,subscriberName);
gc();
}
}
/**
* @return true if there are no pending messages
* @return true if there are no pendingCount messages
*/
public boolean isEmpty(){
return empty;
return pendingCount <= 0;
}
public synchronized int size(){
try{
return store.getMessageCount(clientId,subscriberName);
}catch(IOException e){
log.error(this+" Failed to get the outstanding message count from the store",e);
throw new RuntimeException(e);
}
return getPendingCount();
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
if(empty){
if(isEmpty() && started){
firstMessageId=node.getMessageId();
empty=false;
}
lastMessageId=node.getMessageId();
node.decrementReferenceCount();
pendingCount++;
}
}
public void addMessageFirst(MessageReference node) throws Exception{
if(node!=null){
if(started){
firstMessageId=node.getMessageId();
}
node.decrementReferenceCount();
pendingCount++;
}
}
public synchronized void remove(){
pendingCount--;
}
public synchronized void remove(MessageReference node){
pendingCount--;
}
public synchronized void clear(){
pendingCount=0;
}
public synchronized boolean hasNext(){
return !isEmpty();
}
public synchronized MessageReference next(){
Message result=null;
if(!empty){
if(!isEmpty()){
if(batchList.isEmpty()){
try{
fillBatch();
@ -120,21 +141,14 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
}
if(!batchList.isEmpty()){
result=batchList.removeFirst();
if(firstMessageId!=null){
// Skip messages until we get to the first message.
if(!result.getMessageId().equals(firstMessageId))
result=null;
firstMessageId=null;
}else{
if(lastMessageId!=null){
if(result.getMessageId().equals(lastMessageId)){
empty=true;
//pendingCount=0;
}
}
result.setRegionDestination(regionDestination);
}
}
}
return result;
}
@ -161,8 +175,47 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
// implementation
protected synchronized void fillBatch() throws Exception{
if(!isEmpty()){
store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
if(firstMessageId!=null){
int pos=0;
for(Message msg:batchList){
if(msg.getMessageId().equals(firstMessageId)){
firstMessageId=null;
break;
}
pos++;
}
if(pos>0){
for(int i=0;i<pos&&!batchList.isEmpty();i++){
batchList.removeFirst();
}
if(batchList.isEmpty()){
log.warn("Refilling batch - haven't got past first message = " + firstMessageId);
fillBatch();
}
}
}
}
}
protected synchronized int getPendingCount(){
if(pendingCount <= 0){
pendingCount = getStoreSize();
}
return pendingCount;
}
protected synchronized int getStoreSize(){
try{
return store.getMessageCount(clientId,subscriberName);
}catch(IOException e){
log.error(this+" Failed to get the outstanding message count from the store",e);
throw new RuntimeException(e);
}
}
public synchronized void gc(){
for(Message msg:batchList){