From 1dfd0eeb60921243bec8245edb16c07ca98c1857 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 5 Jul 2016 17:47:49 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6346 Prevent concurrent access to the MQTT protocol handlers which can lead to a tansport level deadlock (cherry picked from commit 96494f74c7142c3396f17696f345c2355c16a61c) --- .../transport/ws/AbstractMQTTSocket.java | 12 +++++++++++- .../transport/ws/jetty9/MQTTSocket.java | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java index 65d12c216f..c293811b6b 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws; import java.io.IOException; import java.security.cert.X509Certificate; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -34,6 +35,7 @@ import org.fusesource.mqtt.codec.MQTTFrame; public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware { + protected ReentrantLock protocolLock = new ReentrantLock(); protected volatile MQTTProtocolConverter protocolConverter = null; protected MQTTWireFormat wireFormat = new MQTTWireFormat(); protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat); @@ -50,16 +52,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT @Override public void oneway(Object command) throws IOException { + protocolLock.lock(); try { getProtocolConverter().onActiveMQCommand((Command)command); } catch (Exception e) { onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); } } @Override public void sendToActiveMQ(Command command) { - doConsume(command); + protocolLock.lock(); + try { + doConsume(command); + } finally { + protocolLock.unlock(); + } } @Override diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java index b2dd9be386..f19e4d229f 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.ws.AbstractMQTTSocket; import org.apache.activemq.util.ByteSequence; @@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); + private final int ORDERLY_CLOSE_TIMEOUT = 10; private Session session; public MQTTSocket(String remoteAddress) { @@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener } } - receiveCounter += length; - + protocolLock.lock(); try { + receiveCounter += length; MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); getProtocolConverter().onMQTTCommand(frame); } catch (Exception e) { onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); } } @Override public void onWebSocketClose(int arg0, String arg1) { try { - getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); + if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1); + getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); + } } catch (Exception e) { LOG.warn("Failed to close WebSocket", e); + } finally { + if (protocolLock.isHeldByCurrentThread()) { + protocolLock.unlock(); + } } }