diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java b/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java index 26ab5a8e8d..0fe6b662ba 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java @@ -109,10 +109,6 @@ public class SingleTransportConnectionStateRegister implements TransportConnect public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException("Cannot lookup a connection that had not been registered: " - + connectionId); - } return cs; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 40fbe03766..1c9e6421f7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -673,44 +673,46 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return null; } - public Response processRemoveConnection(ConnectionId id) throws InterruptedException { + public synchronized Response processRemoveConnection(ConnectionId id) throws InterruptedException { TransportConnectionState cs = lookupConnectionState(id); - // Don't allow things to be added to the connection state while we are - // shutting down. - cs.shutdown(); - - // Cascade the connection stop to the sessions. - for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { - SessionId sessionId = (SessionId)iter.next(); - try { - processRemoveSession(sessionId); - } catch (Throwable e) { - SERVICELOG.warn("Failed to remove session " + sessionId, e); + if (cs != null) { + // Don't allow things to be added to the connection state while we are + // shutting down. + cs.shutdown(); + + // Cascade the connection stop to the sessions. + for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { + SessionId sessionId = (SessionId)iter.next(); + try { + processRemoveSession(sessionId); + } catch (Throwable e) { + SERVICELOG.warn("Failed to remove session " + sessionId, e); + } } - } - // Cascade the connection stop to temp destinations. - for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) { - DestinationInfo di = (DestinationInfo)iter.next(); - try { - broker.removeDestination(cs.getContext(), di.getDestination(), 0); - } catch (Throwable e) { - SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e); + // Cascade the connection stop to temp destinations. + for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) { + DestinationInfo di = (DestinationInfo)iter.next(); + try { + broker.removeDestination(cs.getContext(), di.getDestination(), 0); + } catch (Throwable e) { + SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e); + } + iter.remove(); } - iter.remove(); - } - try { - broker.removeConnection(cs.getContext(), cs.getInfo(), null); - } catch (Throwable e) { - SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e); - } - - TransportConnectionState state = unregisterConnectionState(id); - if (state != null) { - synchronized (brokerConnectionStates) { - // If we are the last reference, we should remove the state - // from the broker. - if (state.decrementReference() == 0) { - brokerConnectionStates.remove(id); + try { + broker.removeConnection(cs.getContext(), cs.getInfo(), null); + } catch (Throwable e) { + SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e); + } + + TransportConnectionState state = unregisterConnectionState(id); + if (state != null) { + synchronized (brokerConnectionStates) { + // If we are the last reference, we should remove the state + // from the broker. + if (state.decrementReference() == 0) { + brokerConnectionStates.remove(id); + } } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java index 7ad2be2984..20f61ee3e6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java @@ -112,9 +112,12 @@ public class ConnectionSplitBroker extends BrokerFilter implements MessageListen String str = "connectionId=" + old.getConnectionId() +",clientId="+old.getClientId(); LOG.warn("Removing stale connection: " + str); try { - old.getConnection().stop(); + //remove connection states + TransportConnection connection = (TransportConnection) old.getConnection(); + connection.processRemoveConnection(old.getConnectionId()); + connection.stopAsync(); } catch (Exception e) { - LOG.error("Failed to remove stale connection: " + str); + LOG.error("Failed to remove stale connection: " + str,e); } } }