Hiram R. Chirino 2006-07-02 14:31:48 +00:00
parent 5143a2ad05
commit 4213222264
3 changed files with 59 additions and 7 deletions

View File

@ -421,7 +421,11 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( ss == null )
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
broker.addProducer(cs.getContext(), info);
ss.addProducer(info);
try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
return null;
}
@ -451,7 +455,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
broker.addConsumer(cs.getContext(), info);
ss.addConsumer(info);
try {
ss.addConsumer(info);
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
return null;
}
@ -476,8 +485,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
ConnectionId connectionId = info.getSessionId().getParentId();
ConnectionState cs = lookupConnectionState(connectionId);
broker.addSession(cs.getContext(), info);
cs.addSession(info);
broker.addSession(cs.getContext(), info);
try {
cs.addSession(info);
} catch (IllegalStateException e) {
broker.removeSession(cs.getContext(), info);
}
return null;
}
@ -487,6 +500,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
ConnectionState cs = lookupConnectionState(connectionId);
SessionState session = cs.getSessionState(id);
// Don't let new consumers or producers get added while we are closing this down.
session.shutdown();
if( session == null )
throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
@ -544,6 +561,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
ConnectionState cs = lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
@ -37,6 +38,7 @@ public class ConnectionState {
final ConnectionInfo info;
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public ConnectionState(ConnectionInfo info) {
this.info = info;
@ -49,10 +51,11 @@ public class ConnectionState {
}
public void addTempDestination(DestinationInfo info) {
checkShutdown();
tempDestinations.add(info);
}
public void removeTempDestination(ActiveMQDestination destination) {
public void removeTempDestination(ActiveMQDestination destination) {
for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) {
DestinationInfo di = (DestinationInfo) iter.next();
if( di.getDestination().equals(destination) ) {
@ -62,6 +65,7 @@ public class ConnectionState {
}
public void addSession(SessionInfo info) {
checkShutdown();
sessions.put(info.getSessionId(), new SessionState(info));
}
public SessionState removeSession(SessionId id) {
@ -85,5 +89,19 @@ public class ConnectionState {
public Collection getSessionStates() {
return sessions.values();
}
}
private void checkShutdown() {
if( shutdown.get() )
throw new IllegalStateException("Disposed");
}
public void shutdown() {
if( shutdown.compareAndSet(false, true) ) {
for (Iterator iter = sessions.values().iterator(); iter.hasNext();) {
SessionState ss = (SessionState) iter.next();
ss.shutdown();
}
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.state;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@ -33,6 +34,7 @@ public class SessionState {
public final ConcurrentHashMap producers = new ConcurrentHashMap();
public final ConcurrentHashMap consumers = new ConcurrentHashMap();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public SessionState(SessionInfo info) {
this.info = info;
@ -42,6 +44,7 @@ public class SessionState {
}
public void addProducer(ProducerInfo info) {
checkShutdown();
producers.put(info.getProducerId(), new ProducerState(info));
}
public ProducerState removeProducer(ProducerId id) {
@ -49,6 +52,7 @@ public class SessionState {
}
public void addConsumer(ConsumerInfo info) {
checkShutdown();
consumers.put(info.getConsumerId(), new ConsumerState(info));
}
public ConsumerState removeConsumer(ConsumerId id) {
@ -72,5 +76,15 @@ public class SessionState {
public Collection getConsumerStates() {
return consumers.values();
}
}
private void checkShutdown() {
if( shutdown.get() )
throw new IllegalStateException("Disposed");
}
public void shutdown() {
shutdown.set(false);
}
}