fixes for memory leaks

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@392904 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-10 08:12:09 +00:00
parent fdd4b4e3e8
commit b792f11767
5 changed files with 45 additions and 33 deletions

View File

@ -348,8 +348,9 @@
<!-- This test currently fails --> <!-- This test currently fails -->
<exclude>**/ItStillMarshallsTheSameTest.*</exclude> <exclude>**/ItStillMarshallsTheSameTest.*</exclude>
<!-- This test currently fails --> <!-- Kaha in flux - removing tests -->
<exclude>**/KahaXARecoveryBrokerTest.*</exclude> <exclude>**/KahaXARecoveryBrokerTest.*</exclude>
<exclude>**/KahaRecoveryBrokerTest.*</exclude>
<!-- https://issues.apache.org/activemq/browse/AMQ-522 --> <!-- https://issues.apache.org/activemq/browse/AMQ-522 -->
<exclude>**/ProxyConnectorTest.*</exclude> <exclude>**/ProxyConnectorTest.*</exclude>

View File

@ -63,6 +63,8 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryProducerId.setConnectionId(idGenerator.generateId()); advisoryProducerId.setConnectionId(idGenerator.generateId());
} }
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
next.addConnection(context, info); next.addConnection(context, info);
@ -149,11 +151,14 @@ public class AdvisoryBroker extends BrokerFilter {
next.removeDestination(context, destination, timeout); next.removeDestination(context, destination, timeout);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
DestinationInfo info = (DestinationInfo) destinations.remove(destination); DestinationInfo info = (DestinationInfo) destinations.remove(destination);
if( info !=null ) { if( info !=null && info.getDestination() != null && topic != null) {
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
fireAdvisory(context, topic, info); fireAdvisory(context, topic, info);
next.removeDestination(context,topic,timeout);
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), timeout);
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), timeout);
} }
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), timeout);
} }
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{

View File

@ -90,26 +90,25 @@ abstract public class AbstractRegion implements Region {
} }
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
throws Exception { throws Exception{
// The destination cannot be removed if there are any active subscriptions // The destination cannot be removed if there are any active subscriptions
for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub = (Subscription) iter.next(); Subscription sub=(Subscription) iter.next();
if( sub.matches(destination) ) { if(sub.matches(destination)){
throw new JMSException("Destination still has an active subscription: "+ destination); throw new JMSException("Destination still has an active subscription: "+destination);
} }
} }
log.debug("Removing destination: "+destination); log.debug("Removing destination: "+destination);
synchronized(destinationsMutex){ synchronized(destinationsMutex){
Destination dest=(Destination) destinations.remove(destination); Destination dest=(Destination) destinations.remove(destination);
if(dest==null) if(dest!=null){
throw new IllegalArgumentException("The destination does not exist: "+destination); destinationMap.removeAll(destination);
dest.dispose(context);
destinationMap.removeAll(destination); dest.stop();
dest.dispose(context); }else{
dest.stop(); log.debug("Destination doesn't exist: " + dest);
}
} }
} }

View File

@ -58,7 +58,7 @@ public class Topic implements Destination {
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
protected final Valve dispatchValve = new Valve(true); protected final Valve dispatchValve = new Valve(true);
protected final TopicMessageStore store; protected final TopicMessageStore store;//this could be NULL! (If an advsiory)
protected final UsageManager usageManager; protected final UsageManager usageManager;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@ -72,7 +72,7 @@ public class Topic implements Destination {
TaskRunnerFactory taskFactory) { TaskRunnerFactory taskFactory) {
this.destination = destination; this.destination = destination;
this.store = store; this.store = store; //this could be NULL! (If an advsiory)
this.usageManager = new UsageManager(memoryManager); this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE); this.usageManager.setLimit(Long.MAX_VALUE);
@ -287,7 +287,7 @@ public class Topic implements Destination {
} }
public Message loadMessage(MessageId messageId) throws IOException { public Message loadMessage(MessageId messageId) throws IOException {
return store.getMessage(messageId); return store != null ? store.getMessage(messageId) : null;
} }
public void start() throws Exception { public void start() throws Exception {
@ -301,19 +301,21 @@ public class Topic implements Destination {
public Message[] browse(){ public Message[] browse(){
final Set result=new CopyOnWriteArraySet(); final Set result=new CopyOnWriteArraySet();
try{ try{
store.recover(new MessageRecoveryListener(){ if(store!=null){
public void recoverMessage(Message message) throws Exception{ store.recover(new MessageRecoveryListener(){
result.add(message); public void recoverMessage(Message message) throws Exception{
} result.add(message);
}
public void recoverMessageReference(String messageReference) throws Exception{} public void recoverMessageReference(String messageReference) throws Exception{}
public void finished(){} public void finished(){}
}); });
Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination()); Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
if(msgs!=null){ if(msgs!=null){
for(int i=0;i<msgs.length;i++){ for(int i=0;i<msgs.length;i++){
result.add(msgs[i]); result.add(msgs[i]);
}
} }
} }
}catch(Throwable e){ }catch(Throwable e){

View File

@ -22,6 +22,7 @@ import java.util.Set;
import javax.jms.InvalidDestinationException; import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -154,7 +155,11 @@ public class TopicRegion extends AbstractRegion {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
TopicMessageStore store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination); TopicMessageStore store = null;
if (!AdvisorySupport.isAdvisoryTopic(destination)){
store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
}
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory); Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination); configureTopic(topic, destination);