diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java index 3d04847b33..8c0fccb1cc 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.ws; import java.io.IOException; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; @@ -143,6 +144,7 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList @Override public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) { this.connection = session; + this.connection.setIdleTimeout(Duration.ZERO); this.connectLatch.countDown(); } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java index b233394365..a8242d4edd 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws; import java.io.IOException; import java.nio.ByteBuffer; import java.security.cert.X509Certificate; +import java.time.Duration; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -218,6 +219,7 @@ public final class WSTransportProxy extends TransportSupport implements Transpor @Override public void onWebSocketConnect(Session session) { this.session = session; + this.session.setIdleTimeout(Duration.ZERO); if (wsTransport.getMaxFrameSize() > 0) { this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize()); diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java index cbfd43113d..853c201c34 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty11; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -116,6 +117,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements MQTTCodec.MQTTFram @Override public void onWebSocketConnect(Session session) { this.session = session; + this.session.setIdleTimeout(Duration.ZERO); } @Override diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java index fb13bd2569..5c718c8ab0 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.ws.jetty11; import java.io.IOException; +import java.time.Duration; import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.stomp.Stomp; @@ -86,6 +87,7 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene @Override public void onWebSocketConnect(Session session) { this.session = session; + this.session.setIdleTimeout(Duration.ZERO); } @Override diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java index 73cc0e6a41..e127d077d0 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -282,6 +283,7 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe @Override public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) { this.connection = session; + this.connection.setIdleTimeout(Duration.ZERO); this.connectLatch.countDown(); } } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java index 9dc04a61ae..671f77f64f 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java @@ -229,7 +229,7 @@ public class MQTTWSTransportTest extends WSTransportTestSupport { TimeUnit.SECONDS.sleep(10); - assertTrue("Connection should still open", Wait.waitFor(new Wait.Condition() { + assertTrue("Connection should still be open", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { @@ -258,4 +258,57 @@ public class MQTTWSTransportTest extends WSTransportTestSupport { } })); } + + @Test(timeout = 60000) + public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception { + + final AtomicBoolean done = new AtomicBoolean(); + + CONNECT command = new CONNECT(); + + command.clientId(new UTF8Buffer(UUID.randomUUID().toString())); + command.cleanSession(false); + command.version(3); + command.keepAlive((short) 60); + + wsMQTTConnection.sendFrame(command.encode()); + + MQTTFrame received = wsMQTTConnection.receive(15, TimeUnit.SECONDS); + if (received == null || received.messageType() != CONNACK.TYPE) { + fail("Client did not get expected CONNACK"); + } + + assertTrue("Connection should open", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 1; + } + })); + + TimeUnit.SECONDS.sleep(35); + + assertTrue("Connection should still be open", getProxyToBroker().getCurrentConnectionsCount() == 1); + + wsMQTTConnection.disconnect(); + wsMQTTConnection.close(); + + done.set(true); + + assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + + assertTrue("Client Connection should close", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !wsMQTTConnection.isConnected(); + } + })); + } } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java index cd17fc9d4b..40e3290ac3 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java @@ -341,4 +341,26 @@ public class StompWSTransportTest extends WSTransportTestSupport { LOG.info("Caught exception on write of disconnect", ex); } } + + @Test(timeout = 60000) + public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:60000,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + wsStompConnection.sendRawFrame(connectFrame); + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertTrue(incoming.startsWith("CONNECTED")); + assertTrue(incoming.indexOf("version:1.1") >= 0); + assertTrue(incoming.indexOf("heart-beat:") >= 0); + assertTrue(incoming.indexOf("session:") >= 0); + + TimeUnit.SECONDS.sleep(35); + + wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT)); + } }