NO-JIRA Fixing Deadlock on MQTTConnection

Found during a testsuite run
This commit is contained in:
Clebert Suconic 2018-02-13 14:40:37 -05:00
parent b5bf5afde7
commit 5480d7bc5b
1 changed files with 23 additions and 21 deletions

View File

@ -18,8 +18,8 @@
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -47,9 +47,9 @@ public class MQTTConnection implements RemotingConnection {
private String clientID; private String clientID;
private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>()); private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>()); private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
public MQTTConnection(Connection transportConnection) throws Exception { public MQTTConnection(Connection transportConnection) throws Exception {
this.transportConnection = transportConnection; this.transportConnection = transportConnection;
@ -100,15 +100,14 @@ public class MQTTConnection implements RemotingConnection {
@Override @Override
public List<CloseListener> removeCloseListeners() { public List<CloseListener> removeCloseListeners() {
synchronized (closeListeners) { List<CloseListener> deletedCloseListeners = copyCloseListeners();
List<CloseListener> deletedCloseListeners = new ArrayList<>(closeListeners); closeListeners.clear();
closeListeners.clear(); return deletedCloseListeners;
return deletedCloseListeners;
}
} }
@Override @Override
public void setCloseListeners(List<CloseListener> listeners) { public void setCloseListeners(List<CloseListener> listeners) {
closeListeners.clear();
closeListeners.addAll(listeners); closeListeners.addAll(listeners);
} }
@ -119,19 +118,15 @@ public class MQTTConnection implements RemotingConnection {
@Override @Override
public List<FailureListener> removeFailureListeners() { public List<FailureListener> removeFailureListeners() {
synchronized (failureListeners) { List<FailureListener> deletedFailureListeners = copyFailureListeners();
List<FailureListener> deletedFailureListeners = new ArrayList<>(failureListeners); failureListeners.clear();
failureListeners.clear(); return deletedFailureListeners;
return deletedFailureListeners;
}
} }
@Override @Override
public void setFailureListeners(List<FailureListener> listeners) { public void setFailureListeners(List<FailureListener> listeners) {
synchronized (failureListeners) { failureListeners.clear();
failureListeners.clear(); failureListeners.addAll(listeners);
failureListeners.addAll(listeners);
}
} }
@Override @Override
@ -141,13 +136,20 @@ public class MQTTConnection implements RemotingConnection {
@Override @Override
public void fail(ActiveMQException me) { public void fail(ActiveMQException me) {
synchronized (failureListeners) { List<FailureListener> copy = copyFailureListeners();
for (FailureListener listener : failureListeners) { for (FailureListener listener : copy) {
listener.connectionFailed(me, false); listener.connectionFailed(me, false);
}
} }
} }
private List<FailureListener> copyFailureListeners() {
return new ArrayList<>(failureListeners);
}
private List<CloseListener> copyCloseListeners() {
return new ArrayList<>(closeListeners);
}
@Override @Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) { public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
synchronized (failureListeners) { synchronized (failureListeners) {