tighten up on message reference counting

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@509563 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-20 13:59:41 +00:00
parent 8de60cf980
commit c46003fd3b
3 changed files with 103 additions and 98 deletions

View File

@ -42,7 +42,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
static private final Log log=LogFactory.getLog(QueueStorePrefetch.class);
private MessageStore store;
private final LinkedList batchList=new LinkedList();
private final LinkedList <Message>batchList=new LinkedList<Message>();
private Destination regionDestination;
private int size = 0;
@ -123,7 +123,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
}
public synchronized MessageReference next(){
Message result = (Message)batchList.removeFirst();
Message result = batchList.removeFirst();
result.setRegionDestination(regionDestination);
return result;
}
@ -137,7 +137,10 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
public void recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
// only increment if count is zero (could have been cached)
if(message.getReferenceCount()==0){
message.incrementReferenceCount();
}
batchList.addLast(message);
}
@ -153,6 +156,9 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
}
public void gc() {
for (Message msg:batchList) {
msg.decrementReferenceCount();
}
batchList.clear();
}

View File

@ -42,7 +42,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
private String clientId;
private String subscriberName;
private Map topics=new HashMap();
private LinkedList storePrefetches=new LinkedList();
private LinkedList <PendingMessageCursor>storePrefetches=new LinkedList<PendingMessageCursor>();
private boolean started;
private PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
@ -61,21 +61,24 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
public synchronized void start() throws Exception{
started=true;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
PendingMessageCursor tsp=(PendingMessageCursor)i.next();
tsp.start();
pendingCount+=tsp.size();
if(!started){
started=true;
for(PendingMessageCursor tsp: storePrefetches){
tsp.start();
pendingCount+=tsp.size();
}
}
}
public synchronized void stop() throws Exception{
started=false;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
PendingMessageCursor tsp=(PendingMessageCursor)i.next();
tsp.stop();
if(started){
started=false;
for(PendingMessageCursor tsp: storePrefetches){
tsp.stop();
}
pendingCount=0;
}
pendingCount=0;
}
/**
@ -119,12 +122,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
public synchronized boolean isEmpty(){
return pendingCount<=0;
}
public boolean isEmpty(Destination destination) {
boolean result = true;
public boolean isEmpty(Destination destination){
boolean result=true;
TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination);
if(tsp!=null){
result = tsp.size() <= 0;
result=tsp.size()<=0;
}
return result;
}
@ -140,7 +143,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
return false;
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
Message msg=node.getMessage();
@ -159,7 +161,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
}
}
public void addRecoveredMessage(MessageReference node) throws Exception{
nonPersistent.addMessageLast(node);
}
@ -178,12 +180,13 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
throw new RuntimeException(e);
}
result=currentCursor!=null?currentCursor.hasNext():false;
}
}
return result;
}
public synchronized MessageReference next(){
return currentCursor!=null?currentCursor.next():null;
MessageReference result = currentCursor!=null?currentCursor.next():null;
return result;
}
public synchronized void remove(){
@ -192,7 +195,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
pendingCount--;
}
public void remove(MessageReference node){
if(currentCursor!=null){
currentCursor.remove(node);
@ -206,7 +209,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
tsp.reset();
}
}
public synchronized void release(){
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
@ -217,7 +220,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
public int size(){
return pendingCount;
}
public synchronized void setMaxBatchSize(int maxBatchSize){
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
@ -225,14 +228,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
super.setMaxBatchSize(maxBatchSize);
}
public synchronized void gc() {
public synchronized void gc(){
for(Iterator i=storePrefetches.iterator();i.hasNext();){
PendingMessageCursor tsp=(PendingMessageCursor)i.next();
tsp.gc();
}
}
public synchronized void setUsageManager(UsageManager usageManager){
super.setUsageManager(usageManager);
for(Iterator i=storePrefetches.iterator();i.hasNext();){
@ -245,7 +248,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
if(currentCursor==null||currentCursor.isEmpty()){
currentCursor=null;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
if(tsp.hasNext()){
currentCursor=tsp;
break;
@ -256,4 +259,8 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
return currentCursor;
}
public String toString(){
return "StoreDurableSubscriber("+clientId+":"+subscriberName+")";
}
}

View File

@ -1,26 +1,21 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
@ -32,25 +27,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* perist pending messages pending message (messages awaiting disptach to a
* consumer) cursor
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
class TopicStorePrefetch extends AbstractPendingMessageCursor implements
MessageRecoveryListener {
class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
private TopicMessageStore store;
private final LinkedList batchList=new LinkedList();
private final LinkedList<Message> batchList=new LinkedList<Message>();
private String clientId;
private String subscriberName;
private Destination regionDestination;
boolean empty;
private MessageId firstMessageId;
private MessageId lastMessageId;
private MessageId firstMessageId;
private MessageId lastMessageId;
/**
* @param topic
@ -59,13 +50,13 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
* @throws IOException
*/
public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){
this.regionDestination = topic;
this.regionDestination=topic;
this.store=(TopicMessageStore)topic.getMessageStore();
this.clientId=clientId;
this.subscriberName=subscriberName;
}
public void start() throws Exception {
public void start() throws Exception{
if(batchList.isEmpty()){
try{
fillBatch();
@ -73,8 +64,8 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
log.error("Failed to fill batch",e);
throw new RuntimeException(e);
}
empty = batchList.isEmpty();
}
empty=batchList.isEmpty();
}
}
public void stop() throws Exception{
@ -88,66 +79,63 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
public boolean isEmpty(){
return empty;
}
public synchronized int size(){
try{
return store.getMessageCount(clientId,subscriberName);
}catch(IOException e){
log.error(this + " Failed to get the outstanding message count from the store",e);
log.error(this+" Failed to get the outstanding message count from the store",e);
throw new RuntimeException(e);
}
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
if( empty ) {
firstMessageId = node.getMessageId();
empty=false;
}
lastMessageId = node.getMessageId();
if(node!=null){
if(empty){
firstMessageId=node.getMessageId();
empty=false;
}
lastMessageId=node.getMessageId();
node.decrementReferenceCount();
}
}
public synchronized boolean hasNext() {
public synchronized boolean hasNext(){
return !isEmpty();
}
public synchronized MessageReference next(){
if( empty ) {
return null;
} else {
// We may need to fill in the batch...
Message result=null;
if(!empty){
if(batchList.isEmpty()){
try{
fillBatch();
}catch(Exception e){
}catch(final Exception e){
log.error("Failed to fill batch",e);
throw new RuntimeException(e);
}
if( batchList.isEmpty()) {
return null;
if(batchList.isEmpty()){
return null;
}
}
if(!batchList.isEmpty()){
result=batchList.removeFirst();
if(firstMessageId!=null){
// Skip messages until we get to the first message.
if(!result.getMessageId().equals(firstMessageId))
result=null;
firstMessageId=null;
}else{
if(lastMessageId!=null){
if(result.getMessageId().equals(lastMessageId)){
empty=true;
}
}
result.setRegionDestination(regionDestination);
}
}
Message result = (Message)batchList.removeFirst();
if( firstMessageId != null ) {
// Skip messages until we get to the first message.
if( !result.getMessageId().equals(firstMessageId) )
return null;
firstMessageId = null;
}
if( lastMessageId != null ) {
if( result.getMessageId().equals(lastMessageId) ) {
empty=true;
}
}
result.setRegionDestination(regionDestination);
return result;
}
return result;
}
public void reset(){
@ -159,12 +147,14 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
public void recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
// only increment if count is zero (could have been cached)
if(message.getReferenceCount()==0){
message.incrementReferenceCount();
}
batchList.addLast(message);
}
public void recoverMessageReference(MessageId messageReference)
throws Exception{
public void recoverMessageReference(MessageId messageReference) throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
}
@ -173,13 +163,15 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
protected void fillBatch() throws Exception{
store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
}
public void gc() {
public void gc(){
for(Message msg:batchList){
msg.decrementReferenceCount();
}
batchList.clear();
}
public String toString() {
return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
public String toString(){
return "TopicStorePrefetch"+System.identityHashCode(this)+"("+clientId+","+subscriberName+")";
}
}