AMQ-1618 Refine thread safety of pool access operations

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637218 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Jencks 2008-03-14 19:23:55 +00:00
parent ceb7e1a86c
commit 8d28bbfdab
1 changed files with 125 additions and 107 deletions

View File

@ -20,6 +20,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.JMSException;
import javax.jms.ServerSession;
@ -45,9 +47,10 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
private final int maxSessions;
private List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
private List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
private AtomicBoolean closing = new AtomicBoolean(false);
private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
private final Lock sessionLock = new ReentrantLock();
private final AtomicBoolean closing = new AtomicBoolean(false);
public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
@ -100,49 +103,33 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
if (closing.get()) {
throw new JMSException("Session Pool Shutting Down.");
}
ServerSessionImpl ss = null;
synchronized (idleSessions) {
if (idleSessions.size() > 0) {
ss = idleSessions.remove(idleSessions.size() - 1);
}
sessionLock.lock();
try {
ss = getExistingServerSession(false);
} finally {
sessionLock.unlock();
}
if (ss != null) {
synchronized (activeSessions) {
activeSessions.add(ss);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Using idle session: " + ss);
}
return ss;
}
synchronized (activeSessions) {
// Are we at the upper limit?
if (activeSessions.size() >= maxSessions) {
// then reuse the already created sessions..
// This is going to queue up messages into a session for
// processing.
return getExistingServerSession();
}
}
ss = createServerSessionImpl();
sessionLock.lock();
try {
// We may not be able to create a session due to the container
// restricting us.
if (ss == null) {
synchronized (activeSessions) {
if (activeSessions.isEmpty()) {
throw new JMSException(
"Endpoint factory did not allow creation any endpoints.");
}
if (activeSessions.isEmpty() && idleSessions.isEmpty()) {
throw new JMSException("Endpoint factory did not allow creation of any endpoints.");
}
return getExistingServerSession();
}
synchronized (activeSessions) {
ss = getExistingServerSession(true);
} else {
activeSessions.add(ss);
}
} finally {
sessionLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Created a new session: " + ss);
}
@ -150,6 +137,94 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
}
/**
* Must be called with sessionLock held.
* Returns an idle session if one exists or an active session if no more
* sessions can be created. Sessions can not be created if force is true
* or activeSessions >= maxSessions.
* @param force do not check activeSessions >= maxSessions, return an active connection anyway.
* @return an already existing session.
*/
private ServerSessionImpl getExistingServerSession(boolean force) {
ServerSessionImpl ss = null;
if (idleSessions.size() > 0) {
ss = idleSessions.remove(idleSessions.size() - 1);
}
if (ss != null) {
activeSessions.add(ss);
if (LOG.isDebugEnabled()) {
LOG.debug("Using idle session: " + ss);
}
} else if (force || activeSessions.size() >= maxSessions) {
// If we are at the upper limit
// then reuse the already created sessions..
// This is going to queue up messages into a session for
// processing.
ss = getExistingActiveServerSession();
}
return ss;
}
/**
* Must be called with sessionLock held.
* Returns the first session from activeSessions, shifting it to last.
* @return session
*/
private ServerSessionImpl getExistingActiveServerSession() {
ServerSessionImpl ss = null;
if (!activeSessions.isEmpty()) {
if (activeSessions.size() > 1) {
// round robin
ss = activeSessions.remove(0);
activeSessions.add(ss);
} else {
ss = activeSessions.get(0);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reusing an active session: " + ss);
}
return ss;
}
public void returnToPool(ServerSessionImpl ss) {
if (LOG.isDebugEnabled()) {
LOG.debug("Session returned to pool: " + ss);
}
sessionLock.lock();
try {
activeSessions.remove(ss);
idleSessions.add(ss);
} finally {
sessionLock.unlock();
}
synchronized (closing) {
closing.notify();
}
}
public void removeFromPool(ServerSessionImpl ss) {
sessionLock.lock();
try {
activeSessions.remove(ss);
} finally {
sessionLock.unlock();
}
try {
ActiveMQSession session = (ActiveMQSession)ss.getSession();
List l = session.getUnconsumedMessages();
for (Iterator i = l.iterator(); i.hasNext();) {
dispatchToSession((MessageDispatch)i.next());
}
} catch (Throwable t) {
LOG.error("Error redispatching unconsumed messages from stale session", t);
}
ss.close();
synchronized (closing) {
closing.notify();
}
}
/**
* @param messageDispatch
* the message to dispatch
@ -177,69 +252,15 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
serverSession.start();
}
/**
* @return session
*/
private ServerSession getExistingServerSession() {
ServerSessionImpl ss = null;
if (!activeSessions.isEmpty()) {
if (activeSessions.size() > 1) {
// round robin
ss = activeSessions.remove(0);
activeSessions.add(ss);
} else {
ss = activeSessions.get(0);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reusing an active session: " + ss);
}
return ss;
}
public void returnToPool(ServerSessionImpl ss) {
if (LOG.isDebugEnabled()) {
LOG.debug("Session returned to pool: " + ss);
}
synchronized(activeSessions) {
activeSessions.remove(ss);
}
synchronized(idleSessions) {
idleSessions.add(ss);
}
synchronized (closing) {
closing.notify();
}
}
public void removeFromPool(ServerSessionImpl ss) {
synchronized(activeSessions) {
activeSessions.remove(ss);
}
try {
ActiveMQSession session = (ActiveMQSession)ss.getSession();
List l = session.getUnconsumedMessages();
for (Iterator i = l.iterator(); i.hasNext();) {
dispatchToSession((MessageDispatch)i.next());
}
} catch (Throwable t) {
LOG.error("Error redispatching unconsumed messages from stale session", t);
}
ss.close();
synchronized (closing) {
closing.notify();
}
}
public void close() {
closing.set(true);
closeIdleSessions();
int activeCount = closeIdleSessions();
// we may have to wait erroneously 250ms if an
// active session is removed during our wait and we
// are not notified
while (getActiveSessionSize() > 0) {
while (activeCount > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Active Sessions = " + getActiveSessionSize());
LOG.debug("Active Sessions = " + activeCount);
}
try {
synchronized (closing) {
@ -249,18 +270,21 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
Thread.currentThread().interrupt();
return;
}
closeIdleSessions();
activeCount = closeIdleSessions();
}
}
private void closeIdleSessions() {
synchronized(idleSessions) {
for (Iterator<ServerSessionImpl> iter = idleSessions.iterator(); iter.hasNext();) {
ServerSessionImpl ss = iter.next();
private int closeIdleSessions() {
sessionLock.lock();
try {
for (ServerSessionImpl ss : idleSessions) {
ss.close();
}
idleSessions.clear();
return activeSessions.size();
} finally {
sessionLock.unlock();
}
}
@ -278,10 +302,4 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
this.closing.set(closing);
}
private int getActiveSessionSize() {
synchronized(activeSessions) {
return activeSessions.size();
}
}
}