Adding a timeout for websocket sends to prevent the transport thread
from getting stuck and blocking.  The default is 30 seconds.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-11-04 12:54:39 -04:00
parent 946c9454d5
commit 450cabe4ea
2 changed files with 22 additions and 3 deletions

View File

@ -46,7 +46,13 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
@Override
public void sendToMQTT(MQTTFrame command) throws IOException {
ByteSequence bytes = wireFormat.marshal(command);
session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
try {
//timeout after a period of time so we don't wait forever and hold the protocol lock
session.getRemote().sendBytesByFuture(
ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
@Override
@ -117,4 +123,8 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
@Override
public void onWebSocketText(String arg0) {
}
private static int getDefaultSendTimeOut() {
return Integer.getInteger("org.apache.activemq.transport.ws.MQTTSocket.sendTimeout", 30);
}
}

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.ws.AbstractStompSocket;
import org.apache.activemq.util.IOExceptionSupport;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
@ -44,8 +45,12 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
@Override
public void sendToStomp(StompFrame command) throws IOException {
//Send async - do we need to wait for the future to complete?
session.getRemote().sendStringByFuture(command.format());
try {
//timeout after a period of time so we don't wait forever and hold the protocol lock
session.getRemote().sendStringByFuture(command.format()).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
@Override
@ -90,4 +95,8 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
public void onWebSocketText(String data) {
processStompFrame(data);
}
private static int getDefaultSendTimeOut() {
return Integer.getInteger("org.apache.activemq.transport.ws.StompSocket.sendTimeout", 30);
}
}