removed areas of contention (deadlocks)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@432224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-08-17 13:07:24 +00:00
parent aaccbd3bd4
commit d0da99a08c
2 changed files with 26 additions and 19 deletions

View File

@ -155,7 +155,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
/**
* @see java.lang.Runnable#run()
*/
synchronized public void run() {
public void run() {
log.debug("Running");
while (true) {
log.debug("run loop start");

View File

@ -35,6 +35,8 @@ import org.apache.activemq.ActiveMQTopicSession;
import org.apache.activemq.command.MessageDispatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* @version $Revision$ $Date$
@ -46,9 +48,9 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
private final int maxSessions;
private ArrayList idleSessions = new ArrayList();
private LinkedList activeSessions = new LinkedList();
private boolean closing = false;
private List idleSessions = new CopyOnWriteArrayList();
private List activeSessions = new CopyOnWriteArrayList();
private AtomicBoolean closing = new AtomicBoolean(false);
public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
@ -92,15 +94,15 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
/**
*/
synchronized public ServerSession getServerSession() throws JMSException {
public ServerSession getServerSession() throws JMSException {
log.debug("ServerSession requested.");
if (closing) {
if (closing.get()) {
throw new JMSException("Session Pool Shutting Down.");
}
if (idleSessions.size() > 0) {
ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1);
activeSessions.addLast(ss);
activeSessions.add(ss);
log.debug("Using idle session: " + ss);
return ss;
} else {
@ -121,7 +123,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
return getExistingServerSession();
}
activeSessions.addLast(ss);
activeSessions.add(ss);
log.debug("Created a new session: " + ss);
return ss;
}
@ -154,20 +156,22 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
* @return
*/
private ServerSession getExistingServerSession() {
ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();
activeSessions.addLast(ss);
ServerSessionImpl ss = (ServerSessionImpl) activeSessions.remove(0);
activeSessions.add(ss);
log.debug("Reusing an active session: " + ss);
return ss;
}
synchronized public void returnToPool(ServerSessionImpl ss) {
public void returnToPool(ServerSessionImpl ss) {
log.debug("Session returned to pool: " + ss);
activeSessions.remove(ss);
idleSessions.add(ss);
notify();
synchronized(closing){
closing.notify();
}
}
synchronized public void removeFromPool(ServerSessionImpl ss) {
public void removeFromPool(ServerSessionImpl ss) {
activeSessions.remove(ss);
try {
ActiveMQSession session = (ActiveMQSession) ss.getSession();
@ -179,16 +183,19 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
log.error("Error redispatching unconsumed messages from stale session", t);
}
ss.close();
notify();
synchronized(closing){
closing.notify();
}
}
public void close() {
synchronized (this) {
closing = true;
synchronized (closing) {
closing.set(true);
closeIdleSessions();
while( activeSessions.size() > 0 ) {
System.out.println("ACtive Sessions = " + activeSessions.size());
try {
wait();
closing.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
@ -209,14 +216,14 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
* @return Returns the closing.
*/
public boolean isClosing(){
return closing;
return closing.get();
}
/**
* @param closing The closing to set.
*/
public void setClosing(boolean closing){
this.closing=closing;
this.closing.set(closing);
}
}