- We now close the consumers created by a pooled session when the session is closed.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383014 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-04 03:41:51 +00:00
parent 7027fe66d7
commit c60ac2ff62
1 changed files with 46 additions and 11 deletions

View File

@ -26,6 +26,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.ObjectPool;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -50,6 +52,7 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import java.io.Serializable; import java.io.Serializable;
import java.util.Iterator;
/** /**
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
@ -64,6 +67,9 @@ public class PooledSession implements TopicSession, QueueSession {
private ActiveMQTopicPublisher topicPublisher; private ActiveMQTopicPublisher topicPublisher;
private boolean transactional = true; private boolean transactional = true;
private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList();
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) { public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
this.session = aSession; this.session = aSession;
this.sessionPool = sessionPool; this.sessionPool = sessionPool;
@ -77,6 +83,17 @@ public class PooledSession implements TopicSession, QueueSession {
// lets reset the session // lets reset the session
getSession().setMessageListener(null); getSession().setMessageListener(null);
// Close any consumers and browsers that may have been created.
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
MessageConsumer consumer = (MessageConsumer) iter.next();
consumer.close();
}
for (Iterator iter = browsers.iterator(); iter.hasNext();) {
QueueBrowser browser = (QueueBrowser) iter.next();
browser.close();
}
// maybe do a rollback? // maybe do a rollback?
if (transactional) { if (transactional) {
try { try {
@ -182,31 +199,32 @@ public class PooledSession implements TopicSession, QueueSession {
// Consumer related methods // Consumer related methods
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
public QueueBrowser createBrowser(Queue queue) throws JMSException { public QueueBrowser createBrowser(Queue queue) throws JMSException {
return getSession().createBrowser(queue); return addQueueBrowser(getSession().createBrowser(queue));
} }
public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
return getSession().createBrowser(queue, selector); return addQueueBrowser(getSession().createBrowser(queue, selector));
} }
public MessageConsumer createConsumer(Destination destination) throws JMSException { public MessageConsumer createConsumer(Destination destination) throws JMSException {
return getSession().createConsumer(destination); return addConsumer(getSession().createConsumer(destination));
} }
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return getSession().createConsumer(destination, selector); return addConsumer(getSession().createConsumer(destination, selector));
} }
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return getSession().createConsumer(destination, selector, noLocal); return addConsumer(getSession().createConsumer(destination, selector, noLocal));
} }
public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
return getSession().createDurableSubscriber(topic, selector); return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector));
} }
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
return getSession().createDurableSubscriber(topic, name, selector, noLocal); return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal));
} }
public MessageListener getMessageListener() throws JMSException { public MessageListener getMessageListener() throws JMSException {
@ -218,19 +236,19 @@ public class PooledSession implements TopicSession, QueueSession {
} }
public TopicSubscriber createSubscriber(Topic topic) throws JMSException { public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return getSession().createSubscriber(topic); return addTopicSubscriber(getSession().createSubscriber(topic));
} }
public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
return getSession().createSubscriber(topic, selector, local); return addTopicSubscriber(getSession().createSubscriber(topic, selector, local));
} }
public QueueReceiver createReceiver(Queue queue) throws JMSException { public QueueReceiver createReceiver(Queue queue) throws JMSException {
return getSession().createReceiver(queue); return addQueueReceiver(getSession().createReceiver(queue));
} }
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
return getSession().createReceiver(queue, selector); return addQueueReceiver(getSession().createReceiver(queue, selector));
} }
@ -278,6 +296,23 @@ public class PooledSession implements TopicSession, QueueSession {
return topicPublisher; return topicPublisher;
} }
private QueueBrowser addQueueBrowser(QueueBrowser browser) {
browsers.add(browser);
return browser;
}
private MessageConsumer addConsumer(MessageConsumer consumer) {
consumers.add(consumer);
return consumer;
}
private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
consumers.add(subscriber);
return subscriber;
}
private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
consumers.add(receiver);
return receiver;
}
public String toString() { public String toString() {
return "PooledSession { "+session+" }"; return "PooledSession { "+session+" }";
} }