AMQ-9432 Disable default Jetty WebSocket idle timeout

(cherry picked from commit 2ddc3c0746)
This commit is contained in:
Albertas Vyšniauskas 2024-02-16 13:01:28 +02:00 committed by JB Onofré
parent ab5dc265f8
commit b2a808466d
7 changed files with 86 additions and 1 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
@ -143,6 +144,7 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
@Override
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
this.connection = session;
this.connection.setIdleTimeout(Duration.ZERO);
this.connectLatch.countDown();
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -218,6 +219,7 @@ public final class WSTransportProxy extends TransportSupport implements Transpor
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
this.session.setIdleTimeout(Duration.ZERO);
if (wsTransport.getMaxFrameSize() > 0) {
this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize());

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty11;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -116,6 +117,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements MQTTCodec.MQTTFram
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
this.session.setIdleTimeout(Duration.ZERO);
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.ws.jetty11;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.stomp.Stomp;
@ -86,6 +87,7 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
this.session.setIdleTimeout(Duration.ZERO);
}
@Override

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@ -282,6 +283,7 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
@Override
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
this.connection = session;
this.connection.setIdleTimeout(Duration.ZERO);
this.connectLatch.countDown();
}
}

View File

@ -229,7 +229,7 @@ public class MQTTWSTransportTest extends WSTransportTestSupport {
TimeUnit.SECONDS.sleep(10);
assertTrue("Connection should still open", Wait.waitFor(new Wait.Condition() {
assertTrue("Connection should still be open", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
@ -258,4 +258,57 @@ public class MQTTWSTransportTest extends WSTransportTestSupport {
}
}));
}
@Test(timeout = 60000)
public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
final AtomicBoolean done = new AtomicBoolean();
CONNECT command = new CONNECT();
command.clientId(new UTF8Buffer(UUID.randomUUID().toString()));
command.cleanSession(false);
command.version(3);
command.keepAlive((short) 60);
wsMQTTConnection.sendFrame(command.encode());
MQTTFrame received = wsMQTTConnection.receive(15, TimeUnit.SECONDS);
if (received == null || received.messageType() != CONNACK.TYPE) {
fail("Client did not get expected CONNACK");
}
assertTrue("Connection should open", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 1;
}
}));
TimeUnit.SECONDS.sleep(35);
assertTrue("Connection should still be open", getProxyToBroker().getCurrentConnectionsCount() == 1);
wsMQTTConnection.disconnect();
wsMQTTConnection.close();
done.set(true);
assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}));
assertTrue("Client Connection should close", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !wsMQTTConnection.isConnected();
}
}));
}
}

View File

@ -341,4 +341,26 @@ public class StompWSTransportTest extends WSTransportTestSupport {
LOG.info("Caught exception on write of disconnect", ex);
}
}
@Test(timeout = 60000)
public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.1\n" +
"heart-beat:60000,0\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
wsStompConnection.sendRawFrame(connectFrame);
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
assertTrue(incoming.startsWith("CONNECTED"));
assertTrue(incoming.indexOf("version:1.1") >= 0);
assertTrue(incoming.indexOf("heart-beat:") >= 0);
assertTrue(incoming.indexOf("session:") >= 0);
TimeUnit.SECONDS.sleep(35);
wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
}
}