From caa6b8e2539132999045c3fbbd627a2dc6d0885c Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 17 Nov 2015 11:03:31 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6046 Prevent concurrent access to protocol handlers which can lead to transport level deadlocks. --- .../transport/ws/AbstractStompSocket.java | 16 ++++++++++++++-- .../transport/ws/jetty9/StompSocket.java | 12 +++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java index ebd949d6bf..b8e0f8f182 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws; import java.io.IOException; import java.security.cert.X509Certificate; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.command.Command; import org.apache.activemq.command.KeepAliveInfo; @@ -41,6 +42,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St private static final Logger LOG = LoggerFactory.getLogger(AbstractStompSocket.class); + protected ReentrantLock protocolLock = new ReentrantLock(); protected ProtocolConverter protocolConverter = new ProtocolConverter(this, null); protected StompWireFormat wireFormat = new StompWireFormat(); protected final CountDownLatch socketTransportStarted = new CountDownLatch(1); @@ -49,7 +51,6 @@ public abstract class AbstractStompSocket extends TransportSupport implements St protected final String remoteAddress; protected X509Certificate[] certificates; - public AbstractStompSocket(String remoteAddress) { super(); this.remoteAddress = remoteAddress; @@ -57,16 +58,24 @@ public abstract class AbstractStompSocket extends TransportSupport implements St @Override public void oneway(Object command) throws IOException { + protocolLock.lock(); try { protocolConverter.onActiveMQCommand((Command)command); } catch (Exception e) { onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); } } @Override public void sendToActiveMQ(Command command) { - doConsume(command); + protocolLock.lock(); + try { + doConsume(command); + } finally { + protocolLock.unlock(); + } } @Override @@ -129,6 +138,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St } } + protocolLock.lock(); try { if (data != null) { receiveCounter += data.length(); @@ -143,6 +153,8 @@ public abstract class AbstractStompSocket extends TransportSupport implements St } } catch (Exception e) { onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); } } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java index b7edcbe59f..a419e71dc1 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.ws.jetty9; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.transport.stomp.StompFrame; @@ -33,6 +34,8 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class); + private final int ORDERLY_CLOSE_TIMEOUT = 10; + private Session session; public StompSocket(String remoteAddress) { @@ -60,9 +63,16 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene @Override public void onWebSocketClose(int arg0, String arg1) { try { - protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT)); + if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.info("Stomp WebSocket closed: code[{}] message[{}]", arg0, arg1); + protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT)); + } } catch (Exception e) { LOG.warn("Failed to close WebSocket", e); + } finally { + if (protocolLock.isHeldByCurrentThread()) { + protocolLock.unlock(); + } } }