tidied up synchronization

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637028 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-14 09:53:39 +00:00
parent c57bad8711
commit bbd2e47dbd
1 changed files with 115 additions and 54 deletions

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.ra;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@ -45,8 +45,8 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
private final int maxSessions;
private List<ServerSessionImpl> idleSessions = new CopyOnWriteArrayList<ServerSessionImpl>();
private List<ServerSessionImpl> activeSessions = new CopyOnWriteArrayList<ServerSessionImpl>();
private List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
private List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
private AtomicBoolean closing = new AtomicBoolean(false);
public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
@ -76,7 +76,9 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
} catch (UnavailableException e) {
// The container could be limiting us on the number of endpoints
// that are being created.
LOG.debug("Could not create an endpoint.", e);
if (LOG.isDebugEnabled()) {
LOG.debug("Could not create an endpoint.", e);
}
session.close();
return null;
}
@ -92,17 +94,30 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
/**
*/
public ServerSession getServerSession() throws JMSException {
LOG.debug("ServerSession requested.");
if (LOG.isDebugEnabled()) {
LOG.debug("ServerSession requested.");
}
if (closing.get()) {
throw new JMSException("Session Pool Shutting Down.");
}
if (idleSessions.size() > 0) {
ServerSessionImpl ss = idleSessions.remove(idleSessions.size() - 1);
activeSessions.add(ss);
LOG.debug("Using idle session: " + ss);
ServerSessionImpl ss = null;
synchronized (idleSessions) {
if (idleSessions.size() > 0) {
ss = idleSessions.remove(idleSessions.size() - 1);
}
}
if (ss != null) {
synchronized (activeSessions) {
activeSessions.add(ss);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Using idle session: " + ss);
}
return ss;
} else {
}
synchronized (activeSessions) {
// Are we at the upper limit?
if (activeSessions.size() >= maxSessions) {
// then reuse the already created sessions..
@ -110,66 +125,97 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
// processing.
return getExistingServerSession();
}
ServerSessionImpl ss = createServerSessionImpl();
// We may not be able to create a session due to the container
// restricting us.
if (ss == null) {
if (activeSessions.size() == 0) {
//no idle sessions, no active sessions, and we can't create a new session....
throw new JMSException("Endpoint factory did not allow creation of any endpoints.");
}
return getExistingServerSession();
}
activeSessions.add(ss);
LOG.debug("Created a new session: " + ss);
return ss;
}
ss = createServerSessionImpl();
// We may not be able to create a session due to the container
// restricting us.
if (ss == null) {
synchronized (activeSessions) {
if (activeSessions.isEmpty()) {
throw new JMSException(
"Endpoint factory did not allow creation any endpoints.");
}
}
return getExistingServerSession();
}
synchronized (activeSessions) {
activeSessions.add(ss);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Created a new session: " + ss);
}
return ss;
}
/**
* @param messageDispatch the message to dispatch
* @param messageDispatch
* the message to dispatch
* @throws JMSException
*/
private void dispatchToSession(MessageDispatch messageDispatch) throws JMSException {
private void dispatchToSession(MessageDispatch messageDispatch)
throws JMSException {
ServerSession serverSession = getServerSession();
Session s = serverSession.getSession();
ActiveMQSession session = null;
if (s instanceof ActiveMQSession) {
session = (ActiveMQSession)s;
session = (ActiveMQSession) s;
} else if (s instanceof ActiveMQQueueSession) {
session = (ActiveMQSession)s;
session = (ActiveMQSession) s;
} else if (s instanceof ActiveMQTopicSession) {
session = (ActiveMQSession)s;
session = (ActiveMQSession) s;
} else {
activeMQAsfEndpointWorker.connection.onAsyncException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
activeMQAsfEndpointWorker.connection
.onAsyncException(new JMSException(
"Session pool provided an invalid session type: "
+ s.getClass()));
}
session.dispatch(messageDispatch);
serverSession.start();
}
/**
* @return
* @return session
*/
private ServerSession getExistingServerSession() {
ServerSessionImpl ss = activeSessions.remove(0);
activeSessions.add(ss);
LOG.debug("Reusing an active session: " + ss);
ServerSessionImpl ss = null;
if (!activeSessions.isEmpty()) {
if (activeSessions.size() > 1) {
// round robin
ss = activeSessions.remove(0);
activeSessions.add(ss);
} else {
ss = activeSessions.get(0);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reusing an active session: " + ss);
}
return ss;
}
public void returnToPool(ServerSessionImpl ss) {
LOG.debug("Session returned to pool: " + ss);
activeSessions.remove(ss);
idleSessions.add(ss);
if (LOG.isDebugEnabled()) {
LOG.debug("Session returned to pool: " + ss);
}
synchronized(activeSessions) {
activeSessions.remove(ss);
}
synchronized(idleSessions) {
idleSessions.add(ss);
}
synchronized (closing) {
closing.notify();
}
}
public void removeFromPool(ServerSessionImpl ss) {
activeSessions.remove(ss);
synchronized(activeSessions) {
activeSessions.remove(ss);
}
try {
ActiveMQSession session = (ActiveMQSession)ss.getSession();
List l = session.getUnconsumedMessages();
@ -186,26 +232,35 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
}
public void close() {
synchronized (closing) {
closing.set(true);
closeIdleSessions();
while (activeSessions.size() > 0) {
LOG.debug("Active Sessions = " + activeSessions.size());
try {
closing.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
closeIdleSessions();
closing.set(true);
closeIdleSessions();
// we may have to wait erroneously 250ms if an
// active session is removed during our wait and we
// are not notified
while (getActiveSessionSize() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Active Sessions = " + getActiveSessionSize());
}
try {
synchronized (closing) {
closing.wait(250);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
closeIdleSessions();
}
}
private void closeIdleSessions() {
for (Iterator<ServerSessionImpl> iter = idleSessions.iterator(); iter.hasNext();) {
ServerSessionImpl ss = iter.next();
ss.close();
synchronized(idleSessions) {
for (Iterator<ServerSessionImpl> iter = idleSessions.iterator(); iter.hasNext();) {
ServerSessionImpl ss = iter.next();
ss.close();
}
idleSessions.clear();
}
}
@ -215,12 +270,18 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
public boolean isClosing() {
return closing.get();
}
/**
* @param closing The closing to set.
*/
public void setClosing(boolean closing) {
this.closing.set(closing);
}
private int getActiveSessionSize() {
synchronized(activeSessions) {
return activeSessions.size();
}
}
}