fix for memory leaks woth non-persistent messages - see http://www.nabble.com/OutOfMemoryErrors-again-tf3083798.html

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@499760 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-01-25 12:23:57 +00:00
parent 1b8148cd37
commit b036d4d2fa
4 changed files with 212 additions and 148 deletions

View File

@ -1,36 +1,29 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -44,40 +37,38 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.atomic.AtomicLong;
public class TopicSubscription extends AbstractSubscription{
private static final Log log=LogFactory.getLog(TopicSubscription.class);
private static final AtomicLong cursorNameCounter=new AtomicLong(0);
final protected FilePendingMessageCursor matched;
final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
final protected UsageManager usageManager;
protected AtomicLong dispatched=new AtomicLong();
protected AtomicLong delivered=new AtomicLong();
private int maximumPendingMessages=-1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private int discarded = 0;
private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy();
private int discarded=0;
private final Object matchedListMutex=new Object();
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
private final AtomicLong enqueueCounter=new AtomicLong(0);
private final AtomicLong dequeueCounter=new AtomicLong(0);
boolean singleDestination=true;
Destination destination;
private int memoryUsageHighWaterMark=95;
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
throws InvalidSelectorException{
super(broker,context,info);
this.usageManager=usageManager;
this.matched = new FilePendingMessageCursor(info.getConsumerId().toString(), broker.getTempDataStore());
String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
+"]";
this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore());
this.matched.setUsageManager(usageManager);
this.matched.start();
}
public void add(MessageReference node) throws InterruptedException,IOException{
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
optimizePrefetch();
// if maximumPendingMessages is set we will only discard messages which
@ -88,40 +79,37 @@ public class TopicSubscription extends AbstractSubscription{
synchronized(matchedListMutex){
matched.addMessageLast(node);
// NOTE - be careful about the slaveBroker!
if (maximumPendingMessages > 0) {
if(maximumPendingMessages>0){
// calculate the high water mark from which point we will eagerly evict expired messages
int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
max = maximumPendingMessages;
int max=messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
if(maximumPendingMessages>0&&maximumPendingMessages<max){
max=maximumPendingMessages;
}
if (!matched.isEmpty() && matched.size() > max) {
if(!matched.isEmpty()&&matched.size()>max){
removeExpiredMessages();
}
// lets discard old messages as we are a slow consumer
while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
int pageInSize = matched.size() - maximumPendingMessages;
//only page in a 1000 at a time - else we could blow da memory
pageInSize = Math.max(1000,pageInSize);
LinkedList list = matched.pageInList(pageInSize);
MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(list);
int messagesToEvict = oldMessages.length;
for(int i = 0; i < messagesToEvict; i++) {
MessageReference oldMessage = oldMessages[i];
while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){
int pageInSize=matched.size()-maximumPendingMessages;
// only page in a 1000 at a time - else we could blow da memory
pageInSize=Math.max(1000,pageInSize);
LinkedList list=matched.pageInList(pageInSize);
MessageReference[] oldMessages=messageEvictionStrategy.evictMessages(list);
int messagesToEvict=oldMessages.length;
for(int i=0;i<messagesToEvict;i++){
MessageReference oldMessage=oldMessages[i];
oldMessage.decrementReferenceCount();
matched.remove(oldMessage);
discarded++;
if (log.isDebugEnabled()) {
log.debug("Discarding message " + oldMessages[i]);
if(log.isDebugEnabled()){
log.debug("Discarding message "+oldMessages[i]);
}
}
// lets avoid an infinite loop if we are given a bad eviction strategy
// for a bad strategy lets just not evict
if (messagesToEvict == 0) {
log.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
if(messagesToEvict==0){
log.warn("No messages to evict returned from eviction strategy: "
+messageEvictionStrategy);
break;
}
}
@ -133,6 +121,7 @@ public class TopicSubscription extends AbstractSubscription{
/**
* Discard any expired messages from the matched list. Called from a synchronized block.
*
* @throws IOException
*/
protected void removeExpiredMessages() throws IOException{
@ -172,16 +161,16 @@ public class TopicSubscription extends AbstractSubscription{
}
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case.
boolean wasFull=isFull();
if(ack.isStandardAck()||ack.isPoisonAck()){
if(context.isInTransaction()){
delivered.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
synchronized( TopicSubscription.this ) {
if( singleDestination ) {
synchronized(TopicSubscription.this){
if(singleDestination){
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
@ -191,11 +180,9 @@ public class TopicSubscription extends AbstractSubscription{
}
});
}else{
if( singleDestination ) {
if(singleDestination){
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
@ -215,7 +202,7 @@ public class TopicSubscription extends AbstractSubscription{
throw new JMSException("Invalid acknowledgment: "+ack);
}
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
public Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception{
// not supported for topics
return null;
}
@ -232,11 +219,11 @@ public class TopicSubscription extends AbstractSubscription{
return maximumPendingMessages;
}
public long getDispatchedCounter() {
public long getDispatchedCounter(){
return dispatched.get();
}
public long getEnqueueCounter() {
public long getEnqueueCounter(){
return enqueueCounter.get();
}
@ -247,23 +234,22 @@ public class TopicSubscription extends AbstractSubscription{
/**
* @return the number of messages discarded due to being a slow consumer
*/
public int discarded() {
synchronized(matchedListMutex) {
public int discarded(){
synchronized(matchedListMutex){
return discarded;
}
}
/**
* @return the number of matched messages (messages targeted for the subscription but not
* yet able to be dispatched due to the prefetch buffer being full).
* @return the number of matched messages (messages targeted for the subscription but not yet able to be dispatched
* due to the prefetch buffer being full).
*/
public int matched() {
synchronized(matchedListMutex) {
public int matched(){
synchronized(matchedListMutex){
return matched.size();
}
}
/**
* Sets the maximum number of pending messages that can be matched against this consumer before old messages are
* discarded.
@ -272,22 +258,19 @@ public class TopicSubscription extends AbstractSubscription{
this.maximumPendingMessages=maximumPendingMessages;
}
public MessageEvictionStrategy getMessageEvictionStrategy() {
public MessageEvictionStrategy getMessageEvictionStrategy(){
return messageEvictionStrategy;
}
/**
* Sets the eviction strategy used to decide which message to evict when the slow consumer
* needs to discard messages
* Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
*/
public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
this.messageEvictionStrategy = messageEvictionStrategy;
public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
this.messageEvictionStrategy=messageEvictionStrategy;
}
// Implementation methods
// -------------------------------------------------------------------------
private boolean isFull(){
return dispatched.get()-delivered.get()>=info.getPrefetchSize();
}
@ -296,23 +279,45 @@ public class TopicSubscription extends AbstractSubscription{
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark(){
return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4);
return (dispatched.get()-delivered.get())<=(info.getPrefetchSize()*.4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark(){
return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9);
return (dispatched.get()-delivered.get())>=(info.getPrefetchSize()*.9);
}
/**
* @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
*/
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){
this.memoryUsageHighWaterMark=memoryUsageHighWaterMark;
}
/**
* @return the memoryUsageHighWaterMark
*/
public int getMemoryUsageHighWaterMark(){
return this.memoryUsageHighWaterMark;
}
/**
* @return the usageManager
*/
public UsageManager getUsageManager(){
return this.usageManager;
}
/**
* inform the MessageConsumer on the client to change it's prefetch
*
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch){
if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
ConsumerControl cc = new ConsumerControl();
if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
ConsumerControl cc=new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(newPrefetch);
context.getConnection().dispatchAsync(cc);
@ -325,17 +330,12 @@ public class TopicSubscription extends AbstractSubscription{
*/
public void optimizePrefetch(){
/*
if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
&&context.getConnection().isManageable()){
if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
info.setCurrentPrefetchSize(info.getPrefetchSize());
updateConsumerPrefetch(info.getPrefetchSize());
}else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
// want to purge any outstanding acks held by the consumer
info.setCurrentPrefetchSize(1);
updateConsumerPrefetch(1);
}
}
* if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
* &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
* isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize());
* updateConsumerPrefetch(info.getPrefetchSize()); }else
* if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ // want to purge any
* outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
*/
}
@ -343,7 +343,7 @@ public class TopicSubscription extends AbstractSubscription{
synchronized(matchedListMutex){
try{
matched.reset();
while(matched.hasNext()){
while(matched.hasNext()&&!isFull()){
MessageReference message=(MessageReference)matched.next();
matched.remove();
// Message may have been sitting in the matched list a while
@ -361,27 +361,26 @@ public class TopicSubscription extends AbstractSubscription{
}
private void dispatch(final MessageReference node) throws IOException{
Message message=(Message) node;
Message message=(Message)node;
// Make sure we can dispatch a message.
MessageDispatch md=new MessageDispatch();
md.setMessage(message);
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
dispatched.incrementAndGet();
// Keep track if this subscription is receiving messages from a single destination.
if( singleDestination ) {
if( destination == null ) {
destination = node.getRegionDestination();
} else {
if( destination != node.getRegionDestination() ) {
singleDestination = false;
if(singleDestination){
if(destination==null){
destination=node.getRegionDestination();
}else{
if(destination!=node.getRegionDestination()){
singleDestination=false;
}
}
}
if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
public void run(){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.decrementReferenceCount();
@ -397,13 +396,13 @@ public class TopicSubscription extends AbstractSubscription{
public String toString(){
return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
+", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()+", discarded="+discarded();
+", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()
+", discarded="+discarded();
}
public void destroy() {
public void destroy(){
synchronized(matchedListMutex){
matched.destroy();
}
}
}

View File

@ -25,14 +25,14 @@ import org.apache.activemq.memory.UsageManager;
* @version $Revision$
*/
public class AbstractPendingMessageCursor implements PendingMessageCursor{
protected int memoryUsageHighWaterMark = 90;
protected int maxBatchSize=100;
protected UsageManager usageManager;
public void start() throws Exception{
public void start() throws Exception {
}
public void stop() throws Exception{
public void stop() throws Exception {
gc();
}
@ -112,7 +112,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
}
public boolean hasSpace() {
return usageManager != null ? !usageManager.isFull() : true;
return usageManager != null ? (usageManager.getPercentUsage() < memoryUsageHighWaterMark): true;
}
public boolean isFull() {
@ -126,4 +126,28 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
public boolean hasMessagesBufferedToDeliver() {
return false;
}
/**
* @return the memoryUsageHighWaterMark
*/
public int getMemoryUsageHighWaterMark(){
return this.memoryUsageHighWaterMark;
}
/**
* @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
*/
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){
this.memoryUsageHighWaterMark=memoryUsageHighWaterMark;
}
/**
* @return the usageManager
*/
public UsageManager getUsageManager(){
return this.usageManager;
}
}

View File

@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageListener;
@ -46,6 +47,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private Destination regionDestination;
private AtomicBoolean iterating=new AtomicBoolean();
private boolean flushRequired;
private AtomicBoolean started=new AtomicBoolean();
/**
* @param name
@ -56,6 +58,23 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
this.store=store;
}
public void start(){
if(started.compareAndSet(false,true)){
if(usageManager!=null){
usageManager.addUsageListener(this);
}
}
}
public void stop(){
if(started.compareAndSet(true,false)){
gc();
if(usageManager!=null){
usageManager.removeUsageListener(this);
}
}
}
/**
* @return true if there are no pending messages
*/
@ -83,6 +102,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
public synchronized void destroy(){
stop();
for(Iterator i=memoryList.iterator();i.hasNext();){
Message node=(Message)i.next();
node.decrementReferenceCount();
@ -214,7 +234,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return false;
}
public boolean hasMessagesBufferedToDeliver() {
public boolean hasMessagesBufferedToDeliver(){
return !isEmpty();
}
@ -224,7 +244,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if(newPercentUsage>=100){
if(newPercentUsage>=getMemoryUsageHighWaterMark()){
synchronized(iterating){
flushRequired=true;
if(!iterating.get()){
@ -240,13 +260,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
protected synchronized void flushToDisk(){
for(Iterator i=memoryList.iterator();i.hasNext();){
MessageReference node=(MessageReference)i.next();
if(!memoryList.isEmpty()){
while(!memoryList.isEmpty()){
MessageReference node=(MessageReference)memoryList.removeFirst();
node.decrementReferenceCount();
getDiskList().addLast(node);
}
memoryList.clear();
}
}
protected boolean isDiskListEmpty(){
return diskList==null||diskList.isEmpty();
@ -255,10 +277,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
protected ListContainer getDiskList(){
if(diskList==null){
try{
diskList=store.getListContainer(name);
diskList=store.getListContainer(name,"TopicSubscription",IndexTypes.DISK_INDEX);
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
diskList.setMaximumCacheSize(0);
}catch(IOException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}

View File

@ -162,6 +162,23 @@ public interface PendingMessageCursor extends Service{
*/
public void setUsageManager(UsageManager usageManager);
/**
* @return the usageManager
*/
public UsageManager getUsageManager();
/**
* @return the memoryUsageHighWaterMark
*/
public int getMemoryUsageHighWaterMark();
/**
* @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
*/
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
/**
* @return true if the cursor is full
*/
@ -171,4 +188,6 @@ public interface PendingMessageCursor extends Service{
* @return true if the cursor has buffered messages ready to deliver
*/
public boolean hasMessagesBufferedToDeliver();
}