diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index eb3b4b11fc..012356b553 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -47,9 +47,9 @@ public class MQTTConnection implements RemotingConnection { private String clientID; - private final List failureListeners = Collections.synchronizedList(new ArrayList()); + private final List failureListeners = new CopyOnWriteArrayList<>(); - private final List closeListeners = Collections.synchronizedList(new ArrayList()); + private final List closeListeners = new CopyOnWriteArrayList<>(); public MQTTConnection(Connection transportConnection) throws Exception { this.transportConnection = transportConnection; @@ -100,15 +100,14 @@ public class MQTTConnection implements RemotingConnection { @Override public List removeCloseListeners() { - synchronized (closeListeners) { - List deletedCloseListeners = new ArrayList<>(closeListeners); - closeListeners.clear(); - return deletedCloseListeners; - } + List deletedCloseListeners = copyCloseListeners(); + closeListeners.clear(); + return deletedCloseListeners; } @Override public void setCloseListeners(List listeners) { + closeListeners.clear(); closeListeners.addAll(listeners); } @@ -119,19 +118,15 @@ public class MQTTConnection implements RemotingConnection { @Override public List removeFailureListeners() { - synchronized (failureListeners) { - List deletedFailureListeners = new ArrayList<>(failureListeners); - failureListeners.clear(); - return deletedFailureListeners; - } + List deletedFailureListeners = copyFailureListeners(); + failureListeners.clear(); + return deletedFailureListeners; } @Override public void setFailureListeners(List listeners) { - synchronized (failureListeners) { - failureListeners.clear(); - failureListeners.addAll(listeners); - } + failureListeners.clear(); + failureListeners.addAll(listeners); } @Override @@ -141,13 +136,20 @@ public class MQTTConnection implements RemotingConnection { @Override public void fail(ActiveMQException me) { - synchronized (failureListeners) { - for (FailureListener listener : failureListeners) { - listener.connectionFailed(me, false); - } + List copy = copyFailureListeners(); + for (FailureListener listener : copy) { + listener.connectionFailed(me, false); } } + private List copyFailureListeners() { + return new ArrayList<>(failureListeners); + } + + private List copyCloseListeners() { + return new ArrayList<>(closeListeners); + } + @Override public void fail(ActiveMQException me, String scaleDownTargetNodeID) { synchronized (failureListeners) {