diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 37205dc813..7acb17bc37 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -421,7 +421,11 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if( ss == null ) throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId); broker.addProducer(cs.getContext(), info); - ss.addProducer(info); + try { + ss.addProducer(info); + } catch (IllegalStateException e) { + broker.removeProducer(cs.getContext(), info); + } return null; } @@ -451,7 +455,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId); broker.addConsumer(cs.getContext(), info); - ss.addConsumer(info); + try { + ss.addConsumer(info); + } catch (IllegalStateException e) { + broker.removeConsumer(cs.getContext(), info); + } + return null; } @@ -476,8 +485,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionState cs = lookupConnectionState(connectionId); - broker.addSession(cs.getContext(), info); - cs.addSession(info); + broker.addSession(cs.getContext(), info); + try { + cs.addSession(info); + } catch (IllegalStateException e) { + broker.removeSession(cs.getContext(), info); + } return null; } @@ -487,6 +500,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C ConnectionState cs = lookupConnectionState(connectionId); SessionState session = cs.getSessionState(id); + + // Don't let new consumers or producers get added while we are closing this down. + session.shutdown(); + if( session == null ) throw new IllegalStateException("Cannot remove session that had not been registered: "+id); @@ -544,6 +561,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C ConnectionState cs = lookupConnectionState(id); + // Don't allow things to be added to the connection state while we are shutting down. + cs.shutdown(); + // Cascade the connection stop to the sessions. for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java index 61c20d15cc..480f719708 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; @@ -37,6 +38,7 @@ public class ConnectionState { final ConnectionInfo info; private final ConcurrentHashMap sessions = new ConcurrentHashMap(); private final List tempDestinations = Collections.synchronizedList(new ArrayList()); + private final AtomicBoolean shutdown = new AtomicBoolean(false); public ConnectionState(ConnectionInfo info) { this.info = info; @@ -49,10 +51,11 @@ public class ConnectionState { } public void addTempDestination(DestinationInfo info) { + checkShutdown(); tempDestinations.add(info); } - public void removeTempDestination(ActiveMQDestination destination) { + public void removeTempDestination(ActiveMQDestination destination) { for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) { DestinationInfo di = (DestinationInfo) iter.next(); if( di.getDestination().equals(destination) ) { @@ -62,6 +65,7 @@ public class ConnectionState { } public void addSession(SessionInfo info) { + checkShutdown(); sessions.put(info.getSessionId(), new SessionState(info)); } public SessionState removeSession(SessionId id) { @@ -85,5 +89,19 @@ public class ConnectionState { public Collection getSessionStates() { return sessions.values(); - } + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + if( shutdown.compareAndSet(false, true) ) { + for (Iterator iter = sessions.values().iterator(); iter.hasNext();) { + SessionState ss = (SessionState) iter.next(); + ss.shutdown(); + } + } + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java b/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java index 257cc7a25d..8ed940c648 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java @@ -19,6 +19,7 @@ package org.apache.activemq.state; import java.util.Collection; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -33,6 +34,7 @@ public class SessionState { public final ConcurrentHashMap producers = new ConcurrentHashMap(); public final ConcurrentHashMap consumers = new ConcurrentHashMap(); + private final AtomicBoolean shutdown = new AtomicBoolean(false); public SessionState(SessionInfo info) { this.info = info; @@ -42,6 +44,7 @@ public class SessionState { } public void addProducer(ProducerInfo info) { + checkShutdown(); producers.put(info.getProducerId(), new ProducerState(info)); } public ProducerState removeProducer(ProducerId id) { @@ -49,6 +52,7 @@ public class SessionState { } public void addConsumer(ConsumerInfo info) { + checkShutdown(); consumers.put(info.getConsumerId(), new ConsumerState(info)); } public ConsumerState removeConsumer(ConsumerId id) { @@ -72,5 +76,15 @@ public class SessionState { public Collection getConsumerStates() { return consumers.values(); - } + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + shutdown.set(false); + } + } \ No newline at end of file