ARTEMIS-3929 Improving OpenWire clientIDSet

I have seen this contention while I was testing ARTEMIS-3928

This does change any semantics and current tests should be enough to validate this change
This commit is contained in:
Clebert Suconic 2022-08-07 22:20:45 -04:00 committed by clebertsuconic
parent 12b81e7a25
commit 568eb70fcd
1 changed files with 34 additions and 38 deletions

View File

@ -106,7 +106,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<>();
private final Map<String, AMQConnectionContext> clientIdSet = new ConcurrentHashMap<>();
private String brokerName;
@ -226,18 +226,16 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
}
public void removeConnection(ConnectionInfo info, Throwable error) throws InvalidClientIDException {
synchronized (clientIdSet) {
String clientId = info.getClientId();
if (clientId != null) {
AMQConnectionContext context = this.clientIdSet.remove(clientId);
if (context != null) {
//connection is still there and need to close
context.getConnection().disconnect(error != null);
this.connections.remove(context.getConnection());
}
} else {
throw new InvalidClientIDException("No clientID specified for connection disconnect request");
String clientId = info.getClientId();
if (clientId != null) {
AMQConnectionContext context = this.clientIdSet.remove(clientId);
if (context != null) {
//connection is still there and need to close
context.getConnection().disconnect(error != null);
this.connections.remove(context.getConnection());
}
} else {
throw new InvalidClientIDException("No clientID specified for connection disconnect request");
}
}
@ -404,35 +402,33 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
throw new InvalidClientIDException("No clientID specified for connection request");
}
synchronized (clientIdSet) {
AMQConnectionContext context;
context = clientIdSet.get(clientId);
if (context != null) {
if (info.isFailoverReconnect()) {
OpenWireConnection oldConnection = context.getConnection();
oldConnection.disconnect(true);
connections.remove(oldConnection);
connection.reconnect(context, info);
} else {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
}
AMQConnectionContext context;
context = clientIdSet.get(clientId);
if (context != null) {
if (info.isFailoverReconnect()) {
OpenWireConnection oldConnection = context.getConnection();
oldConnection.disconnect(true);
connections.remove(oldConnection);
connection.reconnect(context, info);
} else {
//new connection
context = connection.initContext(info);
clientIdSet.put(clientId, context);
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
}
connections.add(connection);
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
// do not distribute passwords in advisory messages. usernames okay
ConnectionInfo copy = info.copy();
copy.setPassword("");
fireAdvisory(context, topic, copy);
// init the conn
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
} else {
//new connection
context = connection.initContext(info);
clientIdSet.put(clientId, context);
}
connections.add(connection);
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
// do not distribute passwords in advisory messages. usernames okay
ConnectionInfo copy = info.copy();
copy.setPassword("");
fireAdvisory(context, topic, copy);
// init the conn
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
}
public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {