From 9a1f9c2fe3d4c0cc32887d942890da833a4ff093 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Tue, 3 Feb 2009 22:10:59 +0000 Subject: [PATCH] AMQ-2095: Make pooled sessions implement XASession for specific transaction management git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@740476 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/pool/JcaConnectionPool.java | 2 +- .../apache/activemq/pool/PooledSession.java | 90 +++++++++++-------- .../org/apache/activemq/pool/SessionPool.java | 2 +- .../activemq/pool/XaConnectionPool.java | 2 +- 4 files changed, 55 insertions(+), 41 deletions(-) diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java index c51f0acb4d..010bf440dc 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java @@ -34,7 +34,7 @@ public class JcaConnectionPool extends XaConnectionPool { } protected XAResource createXaResource(PooledSession session) throws JMSException { - XAResource xares = new LocalAndXATransaction(session.getSession().getTransactionContext()); + XAResource xares = new LocalAndXATransaction(session.getInternalSession().getTransactionContext()); if (name != null) { xares = new WrapperNamedXAResource(xares, name); } diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java index e3b4f95263..c948257d78 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java @@ -42,6 +42,9 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.XASession; +import javax.jms.Session; +import javax.transaction.xa.XAResource; import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.ActiveMQQueueSender; @@ -54,7 +57,7 @@ import org.apache.commons.logging.LogFactory; /** * @version $Revision: 1.1 $ */ -public class PooledSession implements TopicSession, QueueSession { +public class PooledSession implements Session, TopicSession, QueueSession, XASession { private static final transient Log LOG = LogFactory.getLog(PooledSession.class); private ActiveMQSession session; @@ -87,7 +90,7 @@ public class PooledSession implements TopicSession, QueueSession { // TODO a cleaner way to reset?? // lets reset the session - getSession().setMessageListener(null); + getInternalSession().setMessageListener(null); // Close any consumers and browsers that may have been created. for (Iterator iter = consumers.iterator(); iter.hasNext();) { @@ -105,7 +108,7 @@ public class PooledSession implements TopicSession, QueueSession { // maybe do a rollback? if (transactional) { try { - getSession().rollback(); + getInternalSession().rollback(); } catch (JMSException e) { LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e); @@ -126,75 +129,86 @@ public class PooledSession implements TopicSession, QueueSession { } public void commit() throws JMSException { - getSession().commit(); + getInternalSession().commit(); } public BytesMessage createBytesMessage() throws JMSException { - return getSession().createBytesMessage(); + return getInternalSession().createBytesMessage(); } public MapMessage createMapMessage() throws JMSException { - return getSession().createMapMessage(); + return getInternalSession().createMapMessage(); } public Message createMessage() throws JMSException { - return getSession().createMessage(); + return getInternalSession().createMessage(); } public ObjectMessage createObjectMessage() throws JMSException { - return getSession().createObjectMessage(); + return getInternalSession().createObjectMessage(); } public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { - return getSession().createObjectMessage(serializable); + return getInternalSession().createObjectMessage(serializable); } public Queue createQueue(String s) throws JMSException { - return getSession().createQueue(s); + return getInternalSession().createQueue(s); } public StreamMessage createStreamMessage() throws JMSException { - return getSession().createStreamMessage(); + return getInternalSession().createStreamMessage(); } public TemporaryQueue createTemporaryQueue() throws JMSException { - return getSession().createTemporaryQueue(); + return getInternalSession().createTemporaryQueue(); } public TemporaryTopic createTemporaryTopic() throws JMSException { - return getSession().createTemporaryTopic(); + return getInternalSession().createTemporaryTopic(); } public void unsubscribe(String s) throws JMSException { - getSession().unsubscribe(s); + getInternalSession().unsubscribe(s); } public TextMessage createTextMessage() throws JMSException { - return getSession().createTextMessage(); + return getInternalSession().createTextMessage(); } public TextMessage createTextMessage(String s) throws JMSException { - return getSession().createTextMessage(s); + return getInternalSession().createTextMessage(s); } public Topic createTopic(String s) throws JMSException { - return getSession().createTopic(s); + return getInternalSession().createTopic(s); } public int getAcknowledgeMode() throws JMSException { - return getSession().getAcknowledgeMode(); + return getInternalSession().getAcknowledgeMode(); } public boolean getTransacted() throws JMSException { - return getSession().getTransacted(); + return getInternalSession().getTransacted(); } public void recover() throws JMSException { - getSession().recover(); + getInternalSession().recover(); } public void rollback() throws JMSException { - getSession().rollback(); + getInternalSession().rollback(); + } + + public XAResource getXAResource() { + if (session == null) { + throw new IllegalStateException("Session is closed"); + } + return session.getTransactionContext(); + } + + public Session getSession() { + return this; } public void run() { @@ -206,55 +220,55 @@ public class PooledSession implements TopicSession, QueueSession { // Consumer related methods // ------------------------------------------------------------------------- public QueueBrowser createBrowser(Queue queue) throws JMSException { - return addQueueBrowser(getSession().createBrowser(queue)); + return addQueueBrowser(getInternalSession().createBrowser(queue)); } public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { - return addQueueBrowser(getSession().createBrowser(queue, selector)); + return addQueueBrowser(getInternalSession().createBrowser(queue, selector)); } public MessageConsumer createConsumer(Destination destination) throws JMSException { - return addConsumer(getSession().createConsumer(destination)); + return addConsumer(getInternalSession().createConsumer(destination)); } public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { - return addConsumer(getSession().createConsumer(destination, selector)); + return addConsumer(getInternalSession().createConsumer(destination, selector)); } public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { - return addConsumer(getSession().createConsumer(destination, selector, noLocal)); + return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal)); } public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { - return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector)); + return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector)); } public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { - return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal)); + return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal)); } public MessageListener getMessageListener() throws JMSException { - return getSession().getMessageListener(); + return getInternalSession().getMessageListener(); } public void setMessageListener(MessageListener messageListener) throws JMSException { - getSession().setMessageListener(messageListener); + getInternalSession().setMessageListener(messageListener); } public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return addTopicSubscriber(getSession().createSubscriber(topic)); + return addTopicSubscriber(getInternalSession().createSubscriber(topic)); } public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { - return addTopicSubscriber(getSession().createSubscriber(topic, selector, local)); + return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local)); } public QueueReceiver createReceiver(Queue queue) throws JMSException { - return addQueueReceiver(getSession().createReceiver(queue)); + return addQueueReceiver(getInternalSession().createReceiver(queue)); } public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { - return addQueueReceiver(getSession().createReceiver(queue, selector)); + return addQueueReceiver(getInternalSession().createReceiver(queue, selector)); } // Producer related methods @@ -273,7 +287,7 @@ public class PooledSession implements TopicSession, QueueSession { // Implementation methods // ------------------------------------------------------------------------- - protected ActiveMQSession getSession() throws AlreadyClosedException { + protected ActiveMQSession getInternalSession() throws AlreadyClosedException { if (session == null) { throw new AlreadyClosedException("The session has already been closed"); } @@ -282,21 +296,21 @@ public class PooledSession implements TopicSession, QueueSession { public ActiveMQMessageProducer getMessageProducer() throws JMSException { if (messageProducer == null) { - messageProducer = (ActiveMQMessageProducer)getSession().createProducer(null); + messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null); } return messageProducer; } public ActiveMQQueueSender getQueueSender() throws JMSException { if (queueSender == null) { - queueSender = (ActiveMQQueueSender)getSession().createSender(null); + queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null); } return queueSender; } public ActiveMQTopicPublisher getTopicPublisher() throws JMSException { if (topicPublisher == null) { - topicPublisher = (ActiveMQTopicPublisher)getSession().createPublisher(null); + topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null); } return topicPublisher; } diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java index 9290ddcaaf..3ebb017775 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java @@ -78,7 +78,7 @@ public class SessionPool implements PoolableObjectFactory { public void destroyObject(Object o) throws Exception { PooledSession session = (PooledSession)o; - session.getSession().close(); + session.getInternalSession().close(); } public boolean validateObject(Object o) { diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java index 081384d6e1..06a762f200 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java @@ -69,7 +69,7 @@ public class XaConnectionPool extends ConnectionPool { } protected XAResource createXaResource(PooledSession session) throws JMSException { - return session.getSession().getTransactionContext(); + return session.getXAResource(); }