mirror of https://github.com/apache/activemq.git
Improvements to removing stale connections
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c803f531e3
commit
5e747b070d
|
@ -109,10 +109,6 @@ public class SingleTransportConnectionStateRegister implements TransportConnect
|
|||
|
||||
public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
|
||||
TransportConnectionState cs = connectionState;
|
||||
if (cs == null) {
|
||||
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "
|
||||
+ connectionId);
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
|
||||
|
|
|
@ -673,44 +673,46 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
return null;
|
||||
}
|
||||
|
||||
public Response processRemoveConnection(ConnectionId id) throws InterruptedException {
|
||||
public synchronized Response processRemoveConnection(ConnectionId id) throws InterruptedException {
|
||||
TransportConnectionState cs = lookupConnectionState(id);
|
||||
// Don't allow things to be added to the connection state while we are
|
||||
// shutting down.
|
||||
cs.shutdown();
|
||||
|
||||
// Cascade the connection stop to the sessions.
|
||||
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
|
||||
SessionId sessionId = (SessionId)iter.next();
|
||||
try {
|
||||
processRemoveSession(sessionId);
|
||||
} catch (Throwable e) {
|
||||
SERVICELOG.warn("Failed to remove session " + sessionId, e);
|
||||
if (cs != null) {
|
||||
// Don't allow things to be added to the connection state while we are
|
||||
// shutting down.
|
||||
cs.shutdown();
|
||||
|
||||
// Cascade the connection stop to the sessions.
|
||||
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
|
||||
SessionId sessionId = (SessionId)iter.next();
|
||||
try {
|
||||
processRemoveSession(sessionId);
|
||||
} catch (Throwable e) {
|
||||
SERVICELOG.warn("Failed to remove session " + sessionId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Cascade the connection stop to temp destinations.
|
||||
for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
|
||||
DestinationInfo di = (DestinationInfo)iter.next();
|
||||
try {
|
||||
broker.removeDestination(cs.getContext(), di.getDestination(), 0);
|
||||
} catch (Throwable e) {
|
||||
SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
|
||||
// Cascade the connection stop to temp destinations.
|
||||
for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
|
||||
DestinationInfo di = (DestinationInfo)iter.next();
|
||||
try {
|
||||
broker.removeDestination(cs.getContext(), di.getDestination(), 0);
|
||||
} catch (Throwable e) {
|
||||
SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
|
||||
}
|
||||
iter.remove();
|
||||
}
|
||||
iter.remove();
|
||||
}
|
||||
try {
|
||||
broker.removeConnection(cs.getContext(), cs.getInfo(), null);
|
||||
} catch (Throwable e) {
|
||||
SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
|
||||
}
|
||||
|
||||
TransportConnectionState state = unregisterConnectionState(id);
|
||||
if (state != null) {
|
||||
synchronized (brokerConnectionStates) {
|
||||
// If we are the last reference, we should remove the state
|
||||
// from the broker.
|
||||
if (state.decrementReference() == 0) {
|
||||
brokerConnectionStates.remove(id);
|
||||
try {
|
||||
broker.removeConnection(cs.getContext(), cs.getInfo(), null);
|
||||
} catch (Throwable e) {
|
||||
SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
|
||||
}
|
||||
|
||||
TransportConnectionState state = unregisterConnectionState(id);
|
||||
if (state != null) {
|
||||
synchronized (brokerConnectionStates) {
|
||||
// If we are the last reference, we should remove the state
|
||||
// from the broker.
|
||||
if (state.decrementReference() == 0) {
|
||||
brokerConnectionStates.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,9 +112,12 @@ public class ConnectionSplitBroker extends BrokerFilter implements MessageListen
|
|||
String str = "connectionId=" + old.getConnectionId() +",clientId="+old.getClientId();
|
||||
LOG.warn("Removing stale connection: " + str);
|
||||
try {
|
||||
old.getConnection().stop();
|
||||
//remove connection states
|
||||
TransportConnection connection = (TransportConnection) old.getConnection();
|
||||
connection.processRemoveConnection(old.getConnectionId());
|
||||
connection.stopAsync();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to remove stale connection: " + str);
|
||||
LOG.error("Failed to remove stale connection: " + str,e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue