From e9489a45b382b292ed975331a0dd5233d250ac3f Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Fri, 4 Nov 2016 14:09:26 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6482 Adding websocket send timeout to AMQP over websockets (cherry picked from commit 937b2acd4628fdbfe8165db5f97225dddab515e7) --- .../activemq/transport/ws/WSTransportProxy.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java index 7d3ba18418..0ca80efd57 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java @@ -237,7 +237,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor } LOG.trace("WS Proxy sending string of size {} out", data.length()); - session.getRemote().sendString(data); + try { + session.getRemote().sendStringByFuture(data).get(getDefaultSendTimeOut(), TimeUnit.SECONDS); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } } @Override @@ -253,7 +257,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor LOG.trace("WS Proxy sending {} bytes out", data.remaining()); int limit = data.limit(); - session.getRemote().sendBytes(data); + try { + session.getRemote().sendBytesByFuture(data).get(getDefaultSendTimeOut(), TimeUnit.SECONDS); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } // Reset back to original limit and move position to match limit indicating // that we read everything, the websocket sender clears the passed buffer @@ -267,4 +275,8 @@ public final class WSTransportProxy extends TransportSupport implements Transpor private boolean transportStartedAtLeastOnce() { return socketTransportStarted.getCount() == 0; } + + private static int getDefaultSendTimeOut() { + return Integer.getInteger("org.apache.activemq.transport.ws.WSTransportProxy.sendTimeout", 30); + } }