diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedConnectionProxy.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedConnectionProxy.java index f153590cda..5205ab1d1e 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedConnectionProxy.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedConnectionProxy.java @@ -17,7 +17,7 @@ package org.apache.activemq.ra; import java.util.ArrayList; -import java.util.Iterator; +import java.util.List; import javax.jms.Connection; import javax.jms.ConnectionConsumer; @@ -34,7 +34,6 @@ import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; - import org.apache.activemq.ActiveMQQueueSession; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQTopicSession; @@ -42,13 +41,13 @@ import org.apache.activemq.ActiveMQTopicSession; /** * Acts as a pass through proxy for a JMS Connection object. It intercepts * events that are of interest of the ActiveMQManagedConnection. - * + * * @version $Revision$ */ public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener { private ActiveMQManagedConnection managedConnection; - private ArrayList sessions = new ArrayList(); + private final List sessions = new ArrayList(); private ExceptionListener exceptionListener; public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) { @@ -58,7 +57,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi /** * Used to let the ActiveMQManagedConnection that this connection handel is * not needed by the app. - * + * * @throws JMSException */ public void close() throws JMSException { @@ -73,18 +72,19 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi public void cleanup() { exceptionListener = null; managedConnection = null; - for (Iterator iter = sessions.iterator(); iter.hasNext();) { - ManagedSessionProxy p = iter.next(); - try { - p.cleanup(); - } catch (JMSException ignore) { + synchronized (sessions) { + for (ManagedSessionProxy p : sessions) { + try { + //TODO is this dangerous? should we copy the list before iterating? + p.cleanup(); + } catch (JMSException ignore) { + } } - iter.remove(); + sessions.clear(); } } /** - * * @return "physical" underlying activemq connection, if proxy is associated with a managed connection * @throws javax.jms.JMSException if managed connection is null */ @@ -96,7 +96,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @param transacted Whether session is transacted + * @param transacted Whether session is transacted * @param acknowledgeMode session acknowledge mode * @return session proxy * @throws JMSException on error @@ -106,7 +106,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @param transacted Whether session is transacted + * @param transacted Whether session is transacted * @param acknowledgeMode session acknowledge mode * @return session proxy * @throws JMSException on error @@ -115,24 +115,33 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) { acknowledgeMode = Session.AUTO_ACKNOWLEDGE; } -// ActiveMQSession session = (ActiveMQSession)getConnection().createSession(true, acknowledgeMode); - ActiveMQSession session = (ActiveMQSession)getConnection().createSession(transacted, acknowledgeMode); + ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode); ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext()); session.setTransactionContext(txContext); - ManagedSessionProxy p = new ManagedSessionProxy(session); + ManagedSessionProxy p = new ManagedSessionProxy(session, this); p.setUseSharedTxContext(managedConnection.isInManagedTx()); - sessions.add(p); + synchronized (sessions) { + sessions.add(p); + } return p; } + protected void sessionClosed(ManagedSessionProxy session) { + synchronized (sessions) { + sessions.remove(session); + } + } + public void setUseSharedTxContext(boolean enable) throws JMSException { - for (ManagedSessionProxy p : sessions) { - p.setUseSharedTxContext(enable); + synchronized (sessions) { + for (ManagedSessionProxy p : sessions) { + p.setUseSharedTxContext(enable); + } } } /** - * @param transacted Whether session is transacted + * @param transacted Whether session is transacted * @param acknowledgeMode session acknowledge mode * @return session proxy * @throws JMSException on error @@ -142,7 +151,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @param transacted Whether session is transacted + * @param transacted Whether session is transacted * @param acknowledgeMode session acknowledge mode * @return session proxy * @throws JMSException on error @@ -152,7 +161,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @return + * @return client id from delegate * @throws JMSException */ public String getClientID() throws JMSException { @@ -160,7 +169,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @return + * @return exception listener from delegate * @throws JMSException */ public ExceptionListener getExceptionListener() throws JMSException { @@ -168,7 +177,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @return + * @return connection metadata from delegate * @throws JMSException */ public ConnectionMetaData getMetaData() throws JMSException { @@ -176,7 +185,8 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @param clientID + * Sets client id on delegate + * @param clientID new clientId * @throws JMSException */ public void setClientID(String clientID) throws JMSException { @@ -184,7 +194,8 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi } /** - * @param listener + * sets exception listener on delegate + * @param listener new listener * @throws JMSException */ public void setExceptionListener(ExceptionListener listener) throws JMSException { diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedSessionProxy.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedSessionProxy.java index 72fb1391c2..0279ee7919 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedSessionProxy.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedSessionProxy.java @@ -47,7 +47,7 @@ import org.apache.activemq.ActiveMQSession; /** * Acts as a pass through proxy for a JMS Session object. It intercepts events - * that are of interest of the ActiveMQManagedConnection. + * that are of interest of the ActiveMQManagedConnection. There is one proxy for each session. * * @version $Revision$ */ @@ -55,9 +55,11 @@ public class ManagedSessionProxy implements Session, QueueSession, TopicSession private final ActiveMQSession session; private boolean closed; + private ManagedConnectionProxy connectionProxy; - public ManagedSessionProxy(ActiveMQSession session) { + public ManagedSessionProxy(ActiveMQSession session, ManagedConnectionProxy connectionProxy) { this.session = session; + this.connectionProxy = connectionProxy; } public void setUseSharedTxContext(boolean enable) throws JMSException { @@ -70,14 +72,17 @@ public class ManagedSessionProxy implements Session, QueueSession, TopicSession * @throws JMSException */ public void close() throws JMSException { + if (closed) { + return; + } cleanup(); + connectionProxy.sessionClosed(this); } /** - * Called by the ActiveMQManagedConnection to invalidate this proxy. + * Called by the ManagedConnectionProxy to invalidate this proxy. * - * @throws JMSException - * @throws JMSException + * @throws JMSException if session proxy has a problem */ public void cleanup() throws JMSException { closed = true; @@ -85,7 +90,9 @@ public class ManagedSessionProxy implements Session, QueueSession, TopicSession } /** - * + * + * @return underlying session, unless this proxy is closed + * @throws javax.jms.JMSException if session is closed */ private Session getSession() throws JMSException { if (closed) {