From 8d28bbfdab0ecde2a69519fbcd7054e1b55730d2 Mon Sep 17 00:00:00 2001 From: David Jencks Date: Fri, 14 Mar 2008 19:23:55 +0000 Subject: [PATCH] AMQ-1618 Refine thread safety of pool access operations git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637218 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ra/ServerSessionPoolImpl.java | 232 ++++++++++-------- 1 file changed, 125 insertions(+), 107 deletions(-) diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index 5798c9387f..dae5af9d2e 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.jms.JMSException; import javax.jms.ServerSession; @@ -45,9 +47,10 @@ public class ServerSessionPoolImpl implements ServerSessionPool { private final ActiveMQEndpointWorker activeMQAsfEndpointWorker; private final int maxSessions; - private List idleSessions = new ArrayList(); - private List activeSessions = new ArrayList(); - private AtomicBoolean closing = new AtomicBoolean(false); + private final List idleSessions = new ArrayList(); + private final List activeSessions = new ArrayList(); + private final Lock sessionLock = new ReentrantLock(); + private final AtomicBoolean closing = new AtomicBoolean(false); public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) { this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker; @@ -100,48 +103,32 @@ public class ServerSessionPoolImpl implements ServerSessionPool { if (closing.get()) { throw new JMSException("Session Pool Shutting Down."); } - ServerSessionImpl ss = null; - synchronized (idleSessions) { - if (idleSessions.size() > 0) { - ss = idleSessions.remove(idleSessions.size() - 1); - } + sessionLock.lock(); + try { + ss = getExistingServerSession(false); + } finally { + sessionLock.unlock(); } if (ss != null) { - synchronized (activeSessions) { - activeSessions.add(ss); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Using idle session: " + ss); - } return ss; } - - synchronized (activeSessions) { - // Are we at the upper limit? - if (activeSessions.size() >= maxSessions) { - // then reuse the already created sessions.. - // This is going to queue up messages into a session for - // processing. - return getExistingServerSession(); - } - } - ss = createServerSessionImpl(); - // We may not be able to create a session due to the container - // restricting us. - if (ss == null) { - synchronized (activeSessions) { - if (activeSessions.isEmpty()) { - throw new JMSException( - "Endpoint factory did not allow creation any endpoints."); + sessionLock.lock(); + try { + // We may not be able to create a session due to the container + // restricting us. + if (ss == null) { + if (activeSessions.isEmpty() && idleSessions.isEmpty()) { + throw new JMSException("Endpoint factory did not allow creation of any endpoints."); } - } - return getExistingServerSession(); - } - synchronized (activeSessions) { - activeSessions.add(ss); + ss = getExistingServerSession(true); + } else { + activeSessions.add(ss); + } + } finally { + sessionLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("Created a new session: " + ss); @@ -150,6 +137,94 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } + /** + * Must be called with sessionLock held. + * Returns an idle session if one exists or an active session if no more + * sessions can be created. Sessions can not be created if force is true + * or activeSessions >= maxSessions. + * @param force do not check activeSessions >= maxSessions, return an active connection anyway. + * @return an already existing session. + */ + private ServerSessionImpl getExistingServerSession(boolean force) { + ServerSessionImpl ss = null; + if (idleSessions.size() > 0) { + ss = idleSessions.remove(idleSessions.size() - 1); + } + if (ss != null) { + activeSessions.add(ss); + if (LOG.isDebugEnabled()) { + LOG.debug("Using idle session: " + ss); + } + } else if (force || activeSessions.size() >= maxSessions) { + // If we are at the upper limit + // then reuse the already created sessions.. + // This is going to queue up messages into a session for + // processing. + ss = getExistingActiveServerSession(); + } + return ss; + } + + /** + * Must be called with sessionLock held. + * Returns the first session from activeSessions, shifting it to last. + * @return session + */ + private ServerSessionImpl getExistingActiveServerSession() { + ServerSessionImpl ss = null; + if (!activeSessions.isEmpty()) { + if (activeSessions.size() > 1) { + // round robin + ss = activeSessions.remove(0); + activeSessions.add(ss); + } else { + ss = activeSessions.get(0); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Reusing an active session: " + ss); + } + return ss; + } + + public void returnToPool(ServerSessionImpl ss) { + if (LOG.isDebugEnabled()) { + LOG.debug("Session returned to pool: " + ss); + } + sessionLock.lock(); + try { + activeSessions.remove(ss); + idleSessions.add(ss); + } finally { + sessionLock.unlock(); + } + synchronized (closing) { + closing.notify(); + } + } + + public void removeFromPool(ServerSessionImpl ss) { + sessionLock.lock(); + try { + activeSessions.remove(ss); + } finally { + sessionLock.unlock(); + } + try { + ActiveMQSession session = (ActiveMQSession)ss.getSession(); + List l = session.getUnconsumedMessages(); + for (Iterator i = l.iterator(); i.hasNext();) { + dispatchToSession((MessageDispatch)i.next()); + } + } catch (Throwable t) { + LOG.error("Error redispatching unconsumed messages from stale session", t); + } + ss.close(); + synchronized (closing) { + closing.notify(); + } + } + /** * @param messageDispatch * the message to dispatch @@ -177,69 +252,15 @@ public class ServerSessionPoolImpl implements ServerSessionPool { serverSession.start(); } - /** - * @return session - */ - private ServerSession getExistingServerSession() { - ServerSessionImpl ss = null; - if (!activeSessions.isEmpty()) { - if (activeSessions.size() > 1) { - // round robin - ss = activeSessions.remove(0); - activeSessions.add(ss); - } else { - ss = activeSessions.get(0); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Reusing an active session: " + ss); - } - return ss; - } - - public void returnToPool(ServerSessionImpl ss) { - if (LOG.isDebugEnabled()) { - LOG.debug("Session returned to pool: " + ss); - } - synchronized(activeSessions) { - activeSessions.remove(ss); - } - synchronized(idleSessions) { - idleSessions.add(ss); - } - synchronized (closing) { - closing.notify(); - } - } - - public void removeFromPool(ServerSessionImpl ss) { - synchronized(activeSessions) { - activeSessions.remove(ss); - } - try { - ActiveMQSession session = (ActiveMQSession)ss.getSession(); - List l = session.getUnconsumedMessages(); - for (Iterator i = l.iterator(); i.hasNext();) { - dispatchToSession((MessageDispatch)i.next()); - } - } catch (Throwable t) { - LOG.error("Error redispatching unconsumed messages from stale session", t); - } - ss.close(); - synchronized (closing) { - closing.notify(); - } - } - public void close() { closing.set(true); - closeIdleSessions(); + int activeCount = closeIdleSessions(); // we may have to wait erroneously 250ms if an // active session is removed during our wait and we // are not notified - while (getActiveSessionSize() > 0) { + while (activeCount > 0) { if (LOG.isDebugEnabled()) { - LOG.debug("Active Sessions = " + getActiveSessionSize()); + LOG.debug("Active Sessions = " + activeCount); } try { synchronized (closing) { @@ -249,18 +270,21 @@ public class ServerSessionPoolImpl implements ServerSessionPool { Thread.currentThread().interrupt(); return; } - closeIdleSessions(); + activeCount = closeIdleSessions(); } } - - private void closeIdleSessions() { - synchronized(idleSessions) { - for (Iterator iter = idleSessions.iterator(); iter.hasNext();) { - ServerSessionImpl ss = iter.next(); + + private int closeIdleSessions() { + sessionLock.lock(); + try { + for (ServerSessionImpl ss : idleSessions) { ss.close(); } idleSessions.clear(); + return activeSessions.size(); + } finally { + sessionLock.unlock(); } } @@ -270,18 +294,12 @@ public class ServerSessionPoolImpl implements ServerSessionPool { public boolean isClosing() { return closing.get(); } - + /** * @param closing The closing to set. */ public void setClosing(boolean closing) { this.closing.set(closing); } - - private int getActiveSessionSize() { - synchronized(activeSessions) { - return activeSessions.size(); - } - } }