Adding websocket send timeout to AMQP over websockets

(cherry picked from commit 937b2acd4628fdbfe8165db5f97225dddab515e7)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-11-04 14:09:26 -04:00
parent fdf1537eb8
commit e9489a45b3

View File

@ -237,7 +237,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor
} }
LOG.trace("WS Proxy sending string of size {} out", data.length()); LOG.trace("WS Proxy sending string of size {} out", data.length());
session.getRemote().sendString(data); try {
session.getRemote().sendStringByFuture(data).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
} }
@Override @Override
@ -253,7 +257,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor
LOG.trace("WS Proxy sending {} bytes out", data.remaining()); LOG.trace("WS Proxy sending {} bytes out", data.remaining());
int limit = data.limit(); int limit = data.limit();
session.getRemote().sendBytes(data); try {
session.getRemote().sendBytesByFuture(data).get(getDefaultSendTimeOut(), TimeUnit.SECONDS);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
// Reset back to original limit and move position to match limit indicating // Reset back to original limit and move position to match limit indicating
// that we read everything, the websocket sender clears the passed buffer // that we read everything, the websocket sender clears the passed buffer
@ -267,4 +275,8 @@ public final class WSTransportProxy extends TransportSupport implements Transpor
private boolean transportStartedAtLeastOnce() { private boolean transportStartedAtLeastOnce() {
return socketTransportStarted.getCount() == 0; return socketTransportStarted.getCount() == 0;
} }
private static int getDefaultSendTimeOut() {
return Integer.getInteger("org.apache.activemq.transport.ws.WSTransportProxy.sendTimeout", 30);
}
} }