From 1ac3421e361f62f938bc3d306cb7290f40e68581 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Sun, 2 Jul 2006 13:43:19 +0000 Subject: [PATCH] Fixing http://issues.apache.org/activemq/browse/AMQ-724, async exception could close a connection while a new consumer is being added which resulted in the consumer not being removed from the broker when the connction was shut down. Danielius Jurna, thanks for the great bug report and problem determination! git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418592 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/AbstractConnection.java | 28 ++++++++++++++++--- .../activemq/state/ConnectionState.java | 22 +++++++++++++-- .../apache/activemq/state/SessionState.java | 16 ++++++++++- 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 37205dc813..7acb17bc37 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -421,7 +421,11 @@ public abstract class AbstractConnection implements Service, Connection, Task, C if( ss == null ) throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId); broker.addProducer(cs.getContext(), info); - ss.addProducer(info); + try { + ss.addProducer(info); + } catch (IllegalStateException e) { + broker.removeProducer(cs.getContext(), info); + } return null; } @@ -451,7 +455,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId); broker.addConsumer(cs.getContext(), info); - ss.addConsumer(info); + try { + ss.addConsumer(info); + } catch (IllegalStateException e) { + broker.removeConsumer(cs.getContext(), info); + } + return null; } @@ -476,8 +485,12 @@ public abstract class AbstractConnection implements Service, Connection, Task, C ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionState cs = lookupConnectionState(connectionId); - broker.addSession(cs.getContext(), info); - cs.addSession(info); + broker.addSession(cs.getContext(), info); + try { + cs.addSession(info); + } catch (IllegalStateException e) { + broker.removeSession(cs.getContext(), info); + } return null; } @@ -487,6 +500,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C ConnectionState cs = lookupConnectionState(connectionId); SessionState session = cs.getSessionState(id); + + // Don't let new consumers or producers get added while we are closing this down. + session.shutdown(); + if( session == null ) throw new IllegalStateException("Cannot remove session that had not been registered: "+id); @@ -544,6 +561,9 @@ public abstract class AbstractConnection implements Service, Connection, Task, C ConnectionState cs = lookupConnectionState(id); + // Don't allow things to be added to the connection state while we are shutting down. + cs.shutdown(); + // Cascade the connection stop to the sessions. for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java index 61c20d15cc..480f719708 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; @@ -37,6 +38,7 @@ public class ConnectionState { final ConnectionInfo info; private final ConcurrentHashMap sessions = new ConcurrentHashMap(); private final List tempDestinations = Collections.synchronizedList(new ArrayList()); + private final AtomicBoolean shutdown = new AtomicBoolean(false); public ConnectionState(ConnectionInfo info) { this.info = info; @@ -49,10 +51,11 @@ public class ConnectionState { } public void addTempDestination(DestinationInfo info) { + checkShutdown(); tempDestinations.add(info); } - public void removeTempDestination(ActiveMQDestination destination) { + public void removeTempDestination(ActiveMQDestination destination) { for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) { DestinationInfo di = (DestinationInfo) iter.next(); if( di.getDestination().equals(destination) ) { @@ -62,6 +65,7 @@ public class ConnectionState { } public void addSession(SessionInfo info) { + checkShutdown(); sessions.put(info.getSessionId(), new SessionState(info)); } public SessionState removeSession(SessionId id) { @@ -85,5 +89,19 @@ public class ConnectionState { public Collection getSessionStates() { return sessions.values(); - } + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + if( shutdown.compareAndSet(false, true) ) { + for (Iterator iter = sessions.values().iterator(); iter.hasNext();) { + SessionState ss = (SessionState) iter.next(); + ss.shutdown(); + } + } + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java b/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java index 257cc7a25d..8ed940c648 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java @@ -19,6 +19,7 @@ package org.apache.activemq.state; import java.util.Collection; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -33,6 +34,7 @@ public class SessionState { public final ConcurrentHashMap producers = new ConcurrentHashMap(); public final ConcurrentHashMap consumers = new ConcurrentHashMap(); + private final AtomicBoolean shutdown = new AtomicBoolean(false); public SessionState(SessionInfo info) { this.info = info; @@ -42,6 +44,7 @@ public class SessionState { } public void addProducer(ProducerInfo info) { + checkShutdown(); producers.put(info.getProducerId(), new ProducerState(info)); } public ProducerState removeProducer(ProducerId id) { @@ -49,6 +52,7 @@ public class SessionState { } public void addConsumer(ConsumerInfo info) { + checkShutdown(); consumers.put(info.getConsumerId(), new ConsumerState(info)); } public ConsumerState removeConsumer(ConsumerId id) { @@ -72,5 +76,15 @@ public class SessionState { public Collection getConsumerStates() { return consumers.values(); - } + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + shutdown.set(false); + } + } \ No newline at end of file