diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java new file mode 100644 index 0000000000..4b902480e6 --- /dev/null +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.pool; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; + +/** + * A {@link MessageConsumer} which was created by {@link PooledSession}. + */ +public class PooledMessageConsumer implements MessageConsumer { + + private final PooledSession session; + private final MessageConsumer delegate; + + /** + * Wraps the message consumer. + * + * @param session the pooled session + * @param delegate the created consumer to wrap + */ + public PooledMessageConsumer(PooledSession session, MessageConsumer delegate) { + this.session = session; + this.delegate = delegate; + } + + @Override + public void close() throws JMSException { + // ensure session removes consumer as its closed now + session.onConsumerClose(delegate); + delegate.close(); + } + + @Override + public MessageListener getMessageListener() throws JMSException { + return delegate.getMessageListener(); + } + + @Override + public String getMessageSelector() throws JMSException { + return delegate.getMessageSelector(); + } + + @Override + public Message receive() throws JMSException { + return delegate.receive(); + } + + @Override + public Message receive(long timeout) throws JMSException { + return delegate.receive(timeout); + } + + @Override + public Message receiveNoWait() throws JMSException { + return delegate.receiveNoWait(); + } + + @Override + public void setMessageListener(MessageListener listener) throws JMSException { + delegate.setMessageListener(listener); + } + + @Override + public String toString() { + return delegate.toString(); + } +} diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java index bafaf16966..aced4f7685 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java @@ -296,6 +296,18 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes return new PooledTopicPublisher(getTopicPublisher(), topic); } + /** + * Callback invoked when the consumer is closed. + *

+ * This is used to keep track of an explicit closed consumer created by this session, + * by which we know do not need to keep track of the consumer, as its already closed. + * + * @param consumer the consumer which is being closed + */ + protected void onConsumerClose(MessageConsumer consumer) { + consumers.remove(consumer); + } + public ActiveMQSession getInternalSession() throws AlreadyClosedException { if (session == null) { throw new AlreadyClosedException("The session has already been closed"); @@ -331,7 +343,10 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private MessageConsumer addConsumer(MessageConsumer consumer) { consumers.add(consumer); - return consumer; + // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is invoked + // when the returned consumer is closed, to avoid memory leak in this session class + // in case many consumers is created + return new PooledMessageConsumer(this, consumer); } private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) { @@ -344,11 +359,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes return receiver; } - public String toString() { - return "PooledSession { " + session + " }"; - } - public void setIsXa(boolean isXa) { this.isXa = isXa; } + + public String toString() { + return "PooledSession { " + session + " }"; + } }