AMQ-2166 unregister session proxies when they are closed. Patch (modified) from Mario Siegenthaler, slightly modified

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@813992 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Jencks 2009-09-11 19:31:11 +00:00
parent 7da608c411
commit 7167041ee6
2 changed files with 51 additions and 33 deletions

View File

@ -17,7 +17,7 @@
package org.apache.activemq.ra; package org.apache.activemq.ra;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.List;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionConsumer; import javax.jms.ConnectionConsumer;
@ -34,7 +34,6 @@ import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicConnection; import javax.jms.TopicConnection;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQQueueSession; import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession; import org.apache.activemq.ActiveMQTopicSession;
@ -48,7 +47,7 @@ import org.apache.activemq.ActiveMQTopicSession;
public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener { public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener {
private ActiveMQManagedConnection managedConnection; private ActiveMQManagedConnection managedConnection;
private ArrayList<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>(); private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>();
private ExceptionListener exceptionListener; private ExceptionListener exceptionListener;
public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) { public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) {
@ -73,18 +72,19 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
public void cleanup() { public void cleanup() {
exceptionListener = null; exceptionListener = null;
managedConnection = null; managedConnection = null;
for (Iterator<ManagedSessionProxy> iter = sessions.iterator(); iter.hasNext();) { synchronized (sessions) {
ManagedSessionProxy p = iter.next(); for (ManagedSessionProxy p : sessions) {
try { try {
p.cleanup(); //TODO is this dangerous? should we copy the list before iterating?
} catch (JMSException ignore) { p.cleanup();
} catch (JMSException ignore) {
}
} }
iter.remove(); sessions.clear();
} }
} }
/** /**
*
* @return "physical" underlying activemq connection, if proxy is associated with a managed connection * @return "physical" underlying activemq connection, if proxy is associated with a managed connection
* @throws javax.jms.JMSException if managed connection is null * @throws javax.jms.JMSException if managed connection is null
*/ */
@ -96,7 +96,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @param transacted Whether session is transacted * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode * @param acknowledgeMode session acknowledge mode
* @return session proxy * @return session proxy
* @throws JMSException on error * @throws JMSException on error
@ -106,7 +106,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @param transacted Whether session is transacted * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode * @param acknowledgeMode session acknowledge mode
* @return session proxy * @return session proxy
* @throws JMSException on error * @throws JMSException on error
@ -115,24 +115,33 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) { if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
acknowledgeMode = Session.AUTO_ACKNOWLEDGE; acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
} }
// ActiveMQSession session = (ActiveMQSession)getConnection().createSession(true, acknowledgeMode); ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode);
ActiveMQSession session = (ActiveMQSession)getConnection().createSession(transacted, acknowledgeMode);
ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext()); ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
session.setTransactionContext(txContext); session.setTransactionContext(txContext);
ManagedSessionProxy p = new ManagedSessionProxy(session); ManagedSessionProxy p = new ManagedSessionProxy(session, this);
p.setUseSharedTxContext(managedConnection.isInManagedTx()); p.setUseSharedTxContext(managedConnection.isInManagedTx());
sessions.add(p); synchronized (sessions) {
sessions.add(p);
}
return p; return p;
} }
protected void sessionClosed(ManagedSessionProxy session) {
synchronized (sessions) {
sessions.remove(session);
}
}
public void setUseSharedTxContext(boolean enable) throws JMSException { public void setUseSharedTxContext(boolean enable) throws JMSException {
for (ManagedSessionProxy p : sessions) { synchronized (sessions) {
p.setUseSharedTxContext(enable); for (ManagedSessionProxy p : sessions) {
p.setUseSharedTxContext(enable);
}
} }
} }
/** /**
* @param transacted Whether session is transacted * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode * @param acknowledgeMode session acknowledge mode
* @return session proxy * @return session proxy
* @throws JMSException on error * @throws JMSException on error
@ -142,7 +151,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @param transacted Whether session is transacted * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode * @param acknowledgeMode session acknowledge mode
* @return session proxy * @return session proxy
* @throws JMSException on error * @throws JMSException on error
@ -152,7 +161,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @return * @return client id from delegate
* @throws JMSException * @throws JMSException
*/ */
public String getClientID() throws JMSException { public String getClientID() throws JMSException {
@ -160,7 +169,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @return * @return exception listener from delegate
* @throws JMSException * @throws JMSException
*/ */
public ExceptionListener getExceptionListener() throws JMSException { public ExceptionListener getExceptionListener() throws JMSException {
@ -168,7 +177,7 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @return * @return connection metadata from delegate
* @throws JMSException * @throws JMSException
*/ */
public ConnectionMetaData getMetaData() throws JMSException { public ConnectionMetaData getMetaData() throws JMSException {
@ -176,7 +185,8 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @param clientID * Sets client id on delegate
* @param clientID new clientId
* @throws JMSException * @throws JMSException
*/ */
public void setClientID(String clientID) throws JMSException { public void setClientID(String clientID) throws JMSException {
@ -184,7 +194,8 @@ public class ManagedConnectionProxy implements Connection, QueueConnection, Topi
} }
/** /**
* @param listener * sets exception listener on delegate
* @param listener new listener
* @throws JMSException * @throws JMSException
*/ */
public void setExceptionListener(ExceptionListener listener) throws JMSException { public void setExceptionListener(ExceptionListener listener) throws JMSException {

View File

@ -47,7 +47,7 @@ import org.apache.activemq.ActiveMQSession;
/** /**
* Acts as a pass through proxy for a JMS Session object. It intercepts events * Acts as a pass through proxy for a JMS Session object. It intercepts events
* that are of interest of the ActiveMQManagedConnection. * that are of interest of the ActiveMQManagedConnection. There is one proxy for each session.
* *
* @version $Revision$ * @version $Revision$
*/ */
@ -55,9 +55,11 @@ public class ManagedSessionProxy implements Session, QueueSession, TopicSession
private final ActiveMQSession session; private final ActiveMQSession session;
private boolean closed; private boolean closed;
private ManagedConnectionProxy connectionProxy;
public ManagedSessionProxy(ActiveMQSession session) { public ManagedSessionProxy(ActiveMQSession session, ManagedConnectionProxy connectionProxy) {
this.session = session; this.session = session;
this.connectionProxy = connectionProxy;
} }
public void setUseSharedTxContext(boolean enable) throws JMSException { public void setUseSharedTxContext(boolean enable) throws JMSException {
@ -70,14 +72,17 @@ public class ManagedSessionProxy implements Session, QueueSession, TopicSession
* @throws JMSException * @throws JMSException
*/ */
public void close() throws JMSException { public void close() throws JMSException {
if (closed) {
return;
}
cleanup(); cleanup();
connectionProxy.sessionClosed(this);
} }
/** /**
* Called by the ActiveMQManagedConnection to invalidate this proxy. * Called by the ManagedConnectionProxy to invalidate this proxy.
* *
* @throws JMSException * @throws JMSException if session proxy has a problem
* @throws JMSException
*/ */
public void cleanup() throws JMSException { public void cleanup() throws JMSException {
closed = true; closed = true;
@ -86,6 +91,8 @@ public class ManagedSessionProxy implements Session, QueueSession, TopicSession
/** /**
* *
* @return underlying session, unless this proxy is closed
* @throws javax.jms.JMSException if session is closed
*/ */
private Session getSession() throws JMSException { private Session getSession() throws JMSException {
if (closed) { if (closed) {