Fixing http://issues.apache.org/activemq/browse/AMQ-724, async exception could close a connection while a new consumer is being added which resulted in the consumer not being removed from the

broker when the connction was shut down.

Danielius Jurna, thanks for the great bug report and problem determination!



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418592 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-02 13:43:19 +00:00
parent dc98d967b5
commit 1ac3421e36
3 changed files with 59 additions and 7 deletions

View File

@ -421,7 +421,11 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
if( ss == null ) if( ss == null )
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId); throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
broker.addProducer(cs.getContext(), info); broker.addProducer(cs.getContext(), info);
ss.addProducer(info); try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
return null; return null;
} }
@ -451,7 +455,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId); throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
broker.addConsumer(cs.getContext(), info); broker.addConsumer(cs.getContext(), info);
ss.addConsumer(info); try {
ss.addConsumer(info);
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
return null; return null;
} }
@ -476,8 +485,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionId connectionId = info.getSessionId().getParentId();
ConnectionState cs = lookupConnectionState(connectionId); ConnectionState cs = lookupConnectionState(connectionId);
broker.addSession(cs.getContext(), info); broker.addSession(cs.getContext(), info);
cs.addSession(info); try {
cs.addSession(info);
} catch (IllegalStateException e) {
broker.removeSession(cs.getContext(), info);
}
return null; return null;
} }
@ -487,6 +500,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
ConnectionState cs = lookupConnectionState(connectionId); ConnectionState cs = lookupConnectionState(connectionId);
SessionState session = cs.getSessionState(id); SessionState session = cs.getSessionState(id);
// Don't let new consumers or producers get added while we are closing this down.
session.shutdown();
if( session == null ) if( session == null )
throw new IllegalStateException("Cannot remove session that had not been registered: "+id); throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
@ -544,6 +561,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
ConnectionState cs = lookupConnectionState(id); ConnectionState cs = lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions. // Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
@ -37,6 +38,7 @@ public class ConnectionState {
final ConnectionInfo info; final ConnectionInfo info;
private final ConcurrentHashMap sessions = new ConcurrentHashMap(); private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList()); private final List tempDestinations = Collections.synchronizedList(new ArrayList());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public ConnectionState(ConnectionInfo info) { public ConnectionState(ConnectionInfo info) {
this.info = info; this.info = info;
@ -49,10 +51,11 @@ public class ConnectionState {
} }
public void addTempDestination(DestinationInfo info) { public void addTempDestination(DestinationInfo info) {
checkShutdown();
tempDestinations.add(info); tempDestinations.add(info);
} }
public void removeTempDestination(ActiveMQDestination destination) { public void removeTempDestination(ActiveMQDestination destination) {
for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) { for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) {
DestinationInfo di = (DestinationInfo) iter.next(); DestinationInfo di = (DestinationInfo) iter.next();
if( di.getDestination().equals(destination) ) { if( di.getDestination().equals(destination) ) {
@ -62,6 +65,7 @@ public class ConnectionState {
} }
public void addSession(SessionInfo info) { public void addSession(SessionInfo info) {
checkShutdown();
sessions.put(info.getSessionId(), new SessionState(info)); sessions.put(info.getSessionId(), new SessionState(info));
} }
public SessionState removeSession(SessionId id) { public SessionState removeSession(SessionId id) {
@ -85,5 +89,19 @@ public class ConnectionState {
public Collection getSessionStates() { public Collection getSessionStates() {
return sessions.values(); return sessions.values();
} }
private void checkShutdown() {
if( shutdown.get() )
throw new IllegalStateException("Disposed");
}
public void shutdown() {
if( shutdown.compareAndSet(false, true) ) {
for (Iterator iter = sessions.values().iterator(); iter.hasNext();) {
SessionState ss = (SessionState) iter.next();
ss.shutdown();
}
}
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.state;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -33,6 +34,7 @@ public class SessionState {
public final ConcurrentHashMap producers = new ConcurrentHashMap(); public final ConcurrentHashMap producers = new ConcurrentHashMap();
public final ConcurrentHashMap consumers = new ConcurrentHashMap(); public final ConcurrentHashMap consumers = new ConcurrentHashMap();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public SessionState(SessionInfo info) { public SessionState(SessionInfo info) {
this.info = info; this.info = info;
@ -42,6 +44,7 @@ public class SessionState {
} }
public void addProducer(ProducerInfo info) { public void addProducer(ProducerInfo info) {
checkShutdown();
producers.put(info.getProducerId(), new ProducerState(info)); producers.put(info.getProducerId(), new ProducerState(info));
} }
public ProducerState removeProducer(ProducerId id) { public ProducerState removeProducer(ProducerId id) {
@ -49,6 +52,7 @@ public class SessionState {
} }
public void addConsumer(ConsumerInfo info) { public void addConsumer(ConsumerInfo info) {
checkShutdown();
consumers.put(info.getConsumerId(), new ConsumerState(info)); consumers.put(info.getConsumerId(), new ConsumerState(info));
} }
public ConsumerState removeConsumer(ConsumerId id) { public ConsumerState removeConsumer(ConsumerId id) {
@ -72,5 +76,15 @@ public class SessionState {
public Collection getConsumerStates() { public Collection getConsumerStates() {
return consumers.values(); return consumers.values();
} }
private void checkShutdown() {
if( shutdown.get() )
throw new IllegalStateException("Disposed");
}
public void shutdown() {
shutdown.set(false);
}
} }