Adding a timeout for websocket sends to prevent the transport thread
from getting stuck and blocking.  The default is 30 seconds.

(cherry picked from commit 450cabe4ead1fb78eec2e94013d2868a5bf864da)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-11-03 08:41:37 -04:00
parent fd3853c24d
commit fdf1537eb8
2 changed files with 22 additions and 2 deletions

View File

@ -46,7 +46,13 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
@Override
public void sendToMQTT(MQTTFrame command) throws IOException {
ByteSequence bytes = wireFormat.marshal(command);
session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
try {
//timeout after a period of time so we don't wait forever and hold the protocol lock
session.getRemote().sendBytesByFuture(
ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
@Override
@ -117,4 +123,8 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
@Override
public void onWebSocketText(String arg0) {
}
private static int getDefaultSendTimeOut() {
return Integer.getInteger("org.apache.activemq.transport.ws.MQTTSocket.sendTimeout", 30);
}
}

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.ws.AbstractStompSocket;
import org.apache.activemq.util.IOExceptionSupport;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
@ -44,7 +45,12 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
@Override
public void sendToStomp(StompFrame command) throws IOException {
session.getRemote().sendString(command.format());
try {
//timeout after a period of time so we don't wait forever and hold the protocol lock
session.getRemote().sendStringByFuture(command.format()).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
@Override
@ -89,4 +95,8 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
public void onWebSocketText(String data) {
processStompFrame(data);
}
private static int getDefaultSendTimeOut() {
return Integer.getInteger("org.apache.activemq.transport.ws.StompSocket.sendTimeout", 30);
}
}