Prevent concurrent access to the MQTT protocol handlers which can lead
to a tansport level deadlock
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-07-05 17:47:49 +00:00
parent 30ff378a35
commit 96494f74c7
2 changed files with 25 additions and 4 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@ -36,6 +37,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
protected ReentrantLock protocolLock = new ReentrantLock();
protected volatile MQTTProtocolConverter protocolConverter = null;
protected MQTTWireFormat wireFormat = new MQTTWireFormat();
protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
@ -53,16 +55,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
@Override
public void oneway(Object command) throws IOException {
protocolLock.lock();
try {
getProtocolConverter().onActiveMQCommand((Command)command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
} finally {
protocolLock.unlock();
}
}
@Override
public void sendToActiveMQ(Command command) {
doConsume(command);
protocolLock.lock();
try {
doConsume(command);
} finally {
protocolLock.unlock();
}
}
@Override

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.ws.AbstractMQTTSocket;
import org.apache.activemq.util.ByteSequence;
@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
private final int ORDERLY_CLOSE_TIMEOUT = 10;
private Session session;
public MQTTSocket(String remoteAddress) {
@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
}
}
receiveCounter += length;
protocolLock.lock();
try {
receiveCounter += length;
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
getProtocolConverter().onMQTTCommand(frame);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
} finally {
protocolLock.unlock();
}
}
@Override
public void onWebSocketClose(int arg0, String arg1) {
try {
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
}
} catch (Exception e) {
LOG.debug("Failed to close MQTT WebSocket cleanly", e);
} finally {
if (protocolLock.isHeldByCurrentThread()) {
protocolLock.unlock();
}
}
}