Prevent concurrent access to protocol handlers which can lead to
transport level deadlocks.
This commit is contained in:
Timothy Bish 2015-11-17 11:03:31 -05:00
parent 11ac3d9d85
commit caa6b8e253
2 changed files with 25 additions and 3 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.KeepAliveInfo;
@ -41,6 +42,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
private static final Logger LOG = LoggerFactory.getLogger(AbstractStompSocket.class);
protected ReentrantLock protocolLock = new ReentrantLock();
protected ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
protected StompWireFormat wireFormat = new StompWireFormat();
protected final CountDownLatch socketTransportStarted = new CountDownLatch(1);
@ -49,7 +51,6 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
protected final String remoteAddress;
protected X509Certificate[] certificates;
public AbstractStompSocket(String remoteAddress) {
super();
this.remoteAddress = remoteAddress;
@ -57,16 +58,24 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
@Override
public void oneway(Object command) throws IOException {
protocolLock.lock();
try {
protocolConverter.onActiveMQCommand((Command)command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
} finally {
protocolLock.unlock();
}
}
@Override
public void sendToActiveMQ(Command command) {
doConsume(command);
protocolLock.lock();
try {
doConsume(command);
} finally {
protocolLock.unlock();
}
}
@Override
@ -129,6 +138,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
}
}
protocolLock.lock();
try {
if (data != null) {
receiveCounter += data.length();
@ -143,6 +153,8 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
}
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
} finally {
protocolLock.unlock();
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
@ -33,6 +34,8 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
private final int ORDERLY_CLOSE_TIMEOUT = 10;
private Session session;
public StompSocket(String remoteAddress) {
@ -60,9 +63,16 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
@Override
public void onWebSocketClose(int arg0, String arg1) {
try {
protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT));
if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.info("Stomp WebSocket closed: code[{}] message[{}]", arg0, arg1);
protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT));
}
} catch (Exception e) {
LOG.warn("Failed to close WebSocket", e);
} finally {
if (protocolLock.isHeldByCurrentThread()) {
protocolLock.unlock();
}
}
}