diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index bfb5569873..bc3728b8d4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -42,7 +42,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements static private final Log log=LogFactory.getLog(QueueStorePrefetch.class); private MessageStore store; - private final LinkedList batchList=new LinkedList(); + private final LinkedList batchList=new LinkedList(); private Destination regionDestination; private int size = 0; @@ -123,7 +123,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements } public synchronized MessageReference next(){ - Message result = (Message)batchList.removeFirst(); + Message result = batchList.removeFirst(); result.setRegionDestination(regionDestination); return result; } @@ -137,7 +137,10 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements public void recoverMessage(Message message) throws Exception{ message.setRegionDestination(regionDestination); - message.incrementReferenceCount(); + // only increment if count is zero (could have been cached) + if(message.getReferenceCount()==0){ + message.incrementReferenceCount(); + } batchList.addLast(message); } @@ -153,6 +156,9 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements } public void gc() { + for (Message msg:batchList) { + msg.decrementReferenceCount(); + } batchList.clear(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 5b3c8b5417..a4096435a8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -42,7 +42,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ private String clientId; private String subscriberName; private Map topics=new HashMap(); - private LinkedList storePrefetches=new LinkedList(); + private LinkedList storePrefetches=new LinkedList(); private boolean started; private PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; @@ -61,21 +61,24 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ } public synchronized void start() throws Exception{ - started=true; - for(Iterator i=storePrefetches.iterator();i.hasNext();){ - PendingMessageCursor tsp=(PendingMessageCursor)i.next(); - tsp.start(); - pendingCount+=tsp.size(); + if(!started){ + started=true; + for(PendingMessageCursor tsp: storePrefetches){ + tsp.start(); + pendingCount+=tsp.size(); + } } } public synchronized void stop() throws Exception{ - started=false; - for(Iterator i=storePrefetches.iterator();i.hasNext();){ - PendingMessageCursor tsp=(PendingMessageCursor)i.next(); - tsp.stop(); + if(started){ + started=false; + for(PendingMessageCursor tsp: storePrefetches){ + tsp.stop(); + } + + pendingCount=0; } - pendingCount=0; } /** @@ -119,12 +122,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ public synchronized boolean isEmpty(){ return pendingCount<=0; } - - public boolean isEmpty(Destination destination) { - boolean result = true; + + public boolean isEmpty(Destination destination){ + boolean result=true; TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination); if(tsp!=null){ - result = tsp.size() <= 0; + result=tsp.size()<=0; } return result; } @@ -140,7 +143,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ return false; } - public synchronized void addMessageLast(MessageReference node) throws Exception{ if(node!=null){ Message msg=node.getMessage(); @@ -159,7 +161,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ } } } - + public void addRecoveredMessage(MessageReference node) throws Exception{ nonPersistent.addMessageLast(node); } @@ -178,12 +180,13 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ throw new RuntimeException(e); } result=currentCursor!=null?currentCursor.hasNext():false; - } + } return result; } public synchronized MessageReference next(){ - return currentCursor!=null?currentCursor.next():null; + MessageReference result = currentCursor!=null?currentCursor.next():null; + return result; } public synchronized void remove(){ @@ -192,7 +195,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ } pendingCount--; } - + public void remove(MessageReference node){ if(currentCursor!=null){ currentCursor.remove(node); @@ -206,7 +209,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ tsp.reset(); } } - + public synchronized void release(){ for(Iterator i=storePrefetches.iterator();i.hasNext();){ AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); @@ -217,7 +220,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ public int size(){ return pendingCount; } - + public synchronized void setMaxBatchSize(int maxBatchSize){ for(Iterator i=storePrefetches.iterator();i.hasNext();){ AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); @@ -225,14 +228,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ } super.setMaxBatchSize(maxBatchSize); } - - public synchronized void gc() { + + public synchronized void gc(){ for(Iterator i=storePrefetches.iterator();i.hasNext();){ PendingMessageCursor tsp=(PendingMessageCursor)i.next(); tsp.gc(); } } - + public synchronized void setUsageManager(UsageManager usageManager){ super.setUsageManager(usageManager); for(Iterator i=storePrefetches.iterator();i.hasNext();){ @@ -245,7 +248,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ if(currentCursor==null||currentCursor.isEmpty()){ currentCursor=null; for(Iterator i=storePrefetches.iterator();i.hasNext();){ - AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); + AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); if(tsp.hasNext()){ currentCursor=tsp; break; @@ -256,4 +259,8 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ } return currentCursor; } + + public String toString(){ + return "StoreDurableSubscriber("+clientId+":"+subscriberName+")"; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index cb6ed4cae3..6082a6c131 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -1,26 +1,21 @@ /** * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.broker.region.cursors; import java.io.IOException; import java.util.LinkedList; - import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Topic; @@ -32,25 +27,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** - * perist pending messages pending message (messages awaiting disptach to a - * consumer) cursor + * perist pending messages pending message (messages awaiting disptach to a consumer) cursor * * @version $Revision$ */ -class TopicStorePrefetch extends AbstractPendingMessageCursor implements - MessageRecoveryListener { +class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{ static private final Log log=LogFactory.getLog(TopicStorePrefetch.class); - private TopicMessageStore store; - private final LinkedList batchList=new LinkedList(); + private final LinkedList batchList=new LinkedList(); private String clientId; private String subscriberName; private Destination regionDestination; - boolean empty; - private MessageId firstMessageId; - private MessageId lastMessageId; + private MessageId firstMessageId; + private MessageId lastMessageId; /** * @param topic @@ -59,13 +50,13 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements * @throws IOException */ public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){ - this.regionDestination = topic; + this.regionDestination=topic; this.store=(TopicMessageStore)topic.getMessageStore(); this.clientId=clientId; this.subscriberName=subscriberName; } - public void start() throws Exception { + public void start() throws Exception{ if(batchList.isEmpty()){ try{ fillBatch(); @@ -73,8 +64,8 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements log.error("Failed to fill batch",e); throw new RuntimeException(e); } - empty = batchList.isEmpty(); - } + empty=batchList.isEmpty(); + } } public void stop() throws Exception{ @@ -88,66 +79,63 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements public boolean isEmpty(){ return empty; } - + public synchronized int size(){ try{ return store.getMessageCount(clientId,subscriberName); }catch(IOException e){ - log.error(this + " Failed to get the outstanding message count from the store",e); + log.error(this+" Failed to get the outstanding message count from the store",e); throw new RuntimeException(e); } } - + public synchronized void addMessageLast(MessageReference node) throws Exception{ - if(node!=null){ - if( empty ) { - firstMessageId = node.getMessageId(); - empty=false; - } - lastMessageId = node.getMessageId(); + if(node!=null){ + if(empty){ + firstMessageId=node.getMessageId(); + empty=false; + } + lastMessageId=node.getMessageId(); node.decrementReferenceCount(); } } - public synchronized boolean hasNext() { + public synchronized boolean hasNext(){ return !isEmpty(); } public synchronized MessageReference next(){ - - if( empty ) { - return null; - } else { - - // We may need to fill in the batch... + Message result=null; + if(!empty){ if(batchList.isEmpty()){ try{ fillBatch(); - }catch(Exception e){ + }catch(final Exception e){ log.error("Failed to fill batch",e); throw new RuntimeException(e); } - if( batchList.isEmpty()) { - return null; + if(batchList.isEmpty()){ + return null; + } + } + if(!batchList.isEmpty()){ + result=batchList.removeFirst(); + if(firstMessageId!=null){ + // Skip messages until we get to the first message. + if(!result.getMessageId().equals(firstMessageId)) + result=null; + firstMessageId=null; + }else{ + if(lastMessageId!=null){ + if(result.getMessageId().equals(lastMessageId)){ + empty=true; + } + } + result.setRegionDestination(regionDestination); } } - - Message result = (Message)batchList.removeFirst(); - - if( firstMessageId != null ) { - // Skip messages until we get to the first message. - if( !result.getMessageId().equals(firstMessageId) ) - return null; - firstMessageId = null; - } - if( lastMessageId != null ) { - if( result.getMessageId().equals(lastMessageId) ) { - empty=true; - } - } - result.setRegionDestination(regionDestination); - return result; } + return result; } public void reset(){ @@ -159,12 +147,14 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements public void recoverMessage(Message message) throws Exception{ message.setRegionDestination(regionDestination); - message.incrementReferenceCount(); + // only increment if count is zero (could have been cached) + if(message.getReferenceCount()==0){ + message.incrementReferenceCount(); + } batchList.addLast(message); } - public void recoverMessageReference(MessageId messageReference) - throws Exception{ + public void recoverMessageReference(MessageId messageReference) throws Exception{ // shouldn't get called throw new RuntimeException("Not supported"); } @@ -173,13 +163,15 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements protected void fillBatch() throws Exception{ store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this); } - - public void gc() { + + public void gc(){ + for(Message msg:batchList){ + msg.decrementReferenceCount(); + } batchList.clear(); } - - public String toString() { - return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")"; + + public String toString(){ + return "TopicStorePrefetch"+System.identityHashCode(this)+"("+clientId+","+subscriberName+")"; } - }