AMQ-2095: Make pooled sessions implement XASession for specific transaction management

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@740476 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Guillaume Nodet 2009-02-03 22:10:59 +00:00
parent 1ab76f775d
commit 9a1f9c2fe3
4 changed files with 55 additions and 41 deletions

View File

@ -34,7 +34,7 @@ public class JcaConnectionPool extends XaConnectionPool {
} }
protected XAResource createXaResource(PooledSession session) throws JMSException { protected XAResource createXaResource(PooledSession session) throws JMSException {
XAResource xares = new LocalAndXATransaction(session.getSession().getTransactionContext()); XAResource xares = new LocalAndXATransaction(session.getInternalSession().getTransactionContext());
if (name != null) { if (name != null) {
xares = new WrapperNamedXAResource(xares, name); xares = new WrapperNamedXAResource(xares, name);
} }

View File

@ -42,6 +42,9 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import javax.jms.XASession;
import javax.jms.Session;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQQueueSender; import org.apache.activemq.ActiveMQQueueSender;
@ -54,7 +57,7 @@ import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
public class PooledSession implements TopicSession, QueueSession { public class PooledSession implements Session, TopicSession, QueueSession, XASession {
private static final transient Log LOG = LogFactory.getLog(PooledSession.class); private static final transient Log LOG = LogFactory.getLog(PooledSession.class);
private ActiveMQSession session; private ActiveMQSession session;
@ -87,7 +90,7 @@ public class PooledSession implements TopicSession, QueueSession {
// TODO a cleaner way to reset?? // TODO a cleaner way to reset??
// lets reset the session // lets reset the session
getSession().setMessageListener(null); getInternalSession().setMessageListener(null);
// Close any consumers and browsers that may have been created. // Close any consumers and browsers that may have been created.
for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) { for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
@ -105,7 +108,7 @@ public class PooledSession implements TopicSession, QueueSession {
// maybe do a rollback? // maybe do a rollback?
if (transactional) { if (transactional) {
try { try {
getSession().rollback(); getInternalSession().rollback();
} catch (JMSException e) { } catch (JMSException e) {
LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e); LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
@ -126,75 +129,86 @@ public class PooledSession implements TopicSession, QueueSession {
} }
public void commit() throws JMSException { public void commit() throws JMSException {
getSession().commit(); getInternalSession().commit();
} }
public BytesMessage createBytesMessage() throws JMSException { public BytesMessage createBytesMessage() throws JMSException {
return getSession().createBytesMessage(); return getInternalSession().createBytesMessage();
} }
public MapMessage createMapMessage() throws JMSException { public MapMessage createMapMessage() throws JMSException {
return getSession().createMapMessage(); return getInternalSession().createMapMessage();
} }
public Message createMessage() throws JMSException { public Message createMessage() throws JMSException {
return getSession().createMessage(); return getInternalSession().createMessage();
} }
public ObjectMessage createObjectMessage() throws JMSException { public ObjectMessage createObjectMessage() throws JMSException {
return getSession().createObjectMessage(); return getInternalSession().createObjectMessage();
} }
public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
return getSession().createObjectMessage(serializable); return getInternalSession().createObjectMessage(serializable);
} }
public Queue createQueue(String s) throws JMSException { public Queue createQueue(String s) throws JMSException {
return getSession().createQueue(s); return getInternalSession().createQueue(s);
} }
public StreamMessage createStreamMessage() throws JMSException { public StreamMessage createStreamMessage() throws JMSException {
return getSession().createStreamMessage(); return getInternalSession().createStreamMessage();
} }
public TemporaryQueue createTemporaryQueue() throws JMSException { public TemporaryQueue createTemporaryQueue() throws JMSException {
return getSession().createTemporaryQueue(); return getInternalSession().createTemporaryQueue();
} }
public TemporaryTopic createTemporaryTopic() throws JMSException { public TemporaryTopic createTemporaryTopic() throws JMSException {
return getSession().createTemporaryTopic(); return getInternalSession().createTemporaryTopic();
} }
public void unsubscribe(String s) throws JMSException { public void unsubscribe(String s) throws JMSException {
getSession().unsubscribe(s); getInternalSession().unsubscribe(s);
} }
public TextMessage createTextMessage() throws JMSException { public TextMessage createTextMessage() throws JMSException {
return getSession().createTextMessage(); return getInternalSession().createTextMessage();
} }
public TextMessage createTextMessage(String s) throws JMSException { public TextMessage createTextMessage(String s) throws JMSException {
return getSession().createTextMessage(s); return getInternalSession().createTextMessage(s);
} }
public Topic createTopic(String s) throws JMSException { public Topic createTopic(String s) throws JMSException {
return getSession().createTopic(s); return getInternalSession().createTopic(s);
} }
public int getAcknowledgeMode() throws JMSException { public int getAcknowledgeMode() throws JMSException {
return getSession().getAcknowledgeMode(); return getInternalSession().getAcknowledgeMode();
} }
public boolean getTransacted() throws JMSException { public boolean getTransacted() throws JMSException {
return getSession().getTransacted(); return getInternalSession().getTransacted();
} }
public void recover() throws JMSException { public void recover() throws JMSException {
getSession().recover(); getInternalSession().recover();
} }
public void rollback() throws JMSException { public void rollback() throws JMSException {
getSession().rollback(); getInternalSession().rollback();
}
public XAResource getXAResource() {
if (session == null) {
throw new IllegalStateException("Session is closed");
}
return session.getTransactionContext();
}
public Session getSession() {
return this;
} }
public void run() { public void run() {
@ -206,55 +220,55 @@ public class PooledSession implements TopicSession, QueueSession {
// Consumer related methods // Consumer related methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
public QueueBrowser createBrowser(Queue queue) throws JMSException { public QueueBrowser createBrowser(Queue queue) throws JMSException {
return addQueueBrowser(getSession().createBrowser(queue)); return addQueueBrowser(getInternalSession().createBrowser(queue));
} }
public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
return addQueueBrowser(getSession().createBrowser(queue, selector)); return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
} }
public MessageConsumer createConsumer(Destination destination) throws JMSException { public MessageConsumer createConsumer(Destination destination) throws JMSException {
return addConsumer(getSession().createConsumer(destination)); return addConsumer(getInternalSession().createConsumer(destination));
} }
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return addConsumer(getSession().createConsumer(destination, selector)); return addConsumer(getInternalSession().createConsumer(destination, selector));
} }
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return addConsumer(getSession().createConsumer(destination, selector, noLocal)); return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
} }
public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector)); return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
} }
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal)); return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
} }
public MessageListener getMessageListener() throws JMSException { public MessageListener getMessageListener() throws JMSException {
return getSession().getMessageListener(); return getInternalSession().getMessageListener();
} }
public void setMessageListener(MessageListener messageListener) throws JMSException { public void setMessageListener(MessageListener messageListener) throws JMSException {
getSession().setMessageListener(messageListener); getInternalSession().setMessageListener(messageListener);
} }
public TopicSubscriber createSubscriber(Topic topic) throws JMSException { public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return addTopicSubscriber(getSession().createSubscriber(topic)); return addTopicSubscriber(getInternalSession().createSubscriber(topic));
} }
public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
return addTopicSubscriber(getSession().createSubscriber(topic, selector, local)); return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
} }
public QueueReceiver createReceiver(Queue queue) throws JMSException { public QueueReceiver createReceiver(Queue queue) throws JMSException {
return addQueueReceiver(getSession().createReceiver(queue)); return addQueueReceiver(getInternalSession().createReceiver(queue));
} }
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
return addQueueReceiver(getSession().createReceiver(queue, selector)); return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
} }
// Producer related methods // Producer related methods
@ -273,7 +287,7 @@ public class PooledSession implements TopicSession, QueueSession {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected ActiveMQSession getSession() throws AlreadyClosedException { protected ActiveMQSession getInternalSession() throws AlreadyClosedException {
if (session == null) { if (session == null) {
throw new AlreadyClosedException("The session has already been closed"); throw new AlreadyClosedException("The session has already been closed");
} }
@ -282,21 +296,21 @@ public class PooledSession implements TopicSession, QueueSession {
public ActiveMQMessageProducer getMessageProducer() throws JMSException { public ActiveMQMessageProducer getMessageProducer() throws JMSException {
if (messageProducer == null) { if (messageProducer == null) {
messageProducer = (ActiveMQMessageProducer)getSession().createProducer(null); messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
} }
return messageProducer; return messageProducer;
} }
public ActiveMQQueueSender getQueueSender() throws JMSException { public ActiveMQQueueSender getQueueSender() throws JMSException {
if (queueSender == null) { if (queueSender == null) {
queueSender = (ActiveMQQueueSender)getSession().createSender(null); queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
} }
return queueSender; return queueSender;
} }
public ActiveMQTopicPublisher getTopicPublisher() throws JMSException { public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
if (topicPublisher == null) { if (topicPublisher == null) {
topicPublisher = (ActiveMQTopicPublisher)getSession().createPublisher(null); topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
} }
return topicPublisher; return topicPublisher;
} }

View File

@ -78,7 +78,7 @@ public class SessionPool implements PoolableObjectFactory {
public void destroyObject(Object o) throws Exception { public void destroyObject(Object o) throws Exception {
PooledSession session = (PooledSession)o; PooledSession session = (PooledSession)o;
session.getSession().close(); session.getInternalSession().close();
} }
public boolean validateObject(Object o) { public boolean validateObject(Object o) {

View File

@ -69,7 +69,7 @@ public class XaConnectionPool extends ConnectionPool {
} }
protected XAResource createXaResource(PooledSession session) throws JMSException { protected XAResource createXaResource(PooledSession session) throws JMSException {
return session.getSession().getTransactionContext(); return session.getXAResource();
} }