mirror of https://github.com/apache/activemq.git
Prevent concurrent access to the MQTT protocol handlers which can lead
to a tansport level deadlock
(cherry picked from commit 96494f74c7
)
This commit is contained in:
parent
7ddfa97d01
commit
1dfd0eeb60
|
@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
|
|||
import java.io.IOException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerServiceAware;
|
||||
|
@ -34,6 +35,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
|
|||
|
||||
public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
|
||||
|
||||
protected ReentrantLock protocolLock = new ReentrantLock();
|
||||
protected volatile MQTTProtocolConverter protocolConverter = null;
|
||||
protected MQTTWireFormat wireFormat = new MQTTWireFormat();
|
||||
protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
|
||||
|
@ -50,16 +52,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
|
|||
|
||||
@Override
|
||||
public void oneway(Object command) throws IOException {
|
||||
protocolLock.lock();
|
||||
try {
|
||||
getProtocolConverter().onActiveMQCommand((Command)command);
|
||||
} catch (Exception e) {
|
||||
onException(IOExceptionSupport.create(e));
|
||||
} finally {
|
||||
protocolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendToActiveMQ(Command command) {
|
||||
doConsume(command);
|
||||
protocolLock.lock();
|
||||
try {
|
||||
doConsume(command);
|
||||
} finally {
|
||||
protocolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.transport.ws.AbstractMQTTSocket;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
|
||||
|
||||
private final int ORDERLY_CLOSE_TIMEOUT = 10;
|
||||
private Session session;
|
||||
|
||||
public MQTTSocket(String remoteAddress) {
|
||||
|
@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
|
|||
}
|
||||
}
|
||||
|
||||
receiveCounter += length;
|
||||
|
||||
protocolLock.lock();
|
||||
try {
|
||||
receiveCounter += length;
|
||||
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
|
||||
getProtocolConverter().onMQTTCommand(frame);
|
||||
} catch (Exception e) {
|
||||
onException(IOExceptionSupport.create(e));
|
||||
} finally {
|
||||
protocolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int arg0, String arg1) {
|
||||
try {
|
||||
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
|
||||
if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
|
||||
LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
|
||||
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to close WebSocket", e);
|
||||
} finally {
|
||||
if (protocolLock.isHeldByCurrentThread()) {
|
||||
protocolLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue