mirror of https://github.com/apache/activemq.git
AMQ-9432 Disable default Jetty WebSocket idle timeout
This commit is contained in:
parent
65a6b805a0
commit
2ddc3c0746
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.transport.ws;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
@ -143,6 +144,7 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
|
|||
@Override
|
||||
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
|
||||
this.connection = session;
|
||||
this.connection.setIdleTimeout(Duration.ZERO);
|
||||
this.connectLatch.countDown();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -218,6 +219,7 @@ public final class WSTransportProxy extends TransportSupport implements Transpor
|
|||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
this.session = session;
|
||||
this.session.setIdleTimeout(Duration.ZERO);
|
||||
|
||||
if (wsTransport.getMaxFrameSize() > 0) {
|
||||
this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize());
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty11;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -116,6 +117,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements MQTTCodec.MQTTFram
|
|||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
this.session = session;
|
||||
this.session.setIdleTimeout(Duration.ZERO);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.transport.ws.jetty11;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.transport.stomp.Stomp;
|
||||
|
@ -86,6 +87,7 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene
|
|||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
this.session = session;
|
||||
this.session.setIdleTimeout(Duration.ZERO);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -282,6 +283,7 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
|
|||
@Override
|
||||
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
|
||||
this.connection = session;
|
||||
this.connection.setIdleTimeout(Duration.ZERO);
|
||||
this.connectLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,7 +229,7 @@ public class MQTTWSTransportTest extends WSTransportTestSupport {
|
|||
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
|
||||
assertTrue("Connection should still open", Wait.waitFor(new Wait.Condition() {
|
||||
assertTrue("Connection should still be open", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
|
@ -258,4 +258,57 @@ public class MQTTWSTransportTest extends WSTransportTestSupport {
|
|||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
|
||||
|
||||
final AtomicBoolean done = new AtomicBoolean();
|
||||
|
||||
CONNECT command = new CONNECT();
|
||||
|
||||
command.clientId(new UTF8Buffer(UUID.randomUUID().toString()));
|
||||
command.cleanSession(false);
|
||||
command.version(3);
|
||||
command.keepAlive((short) 60);
|
||||
|
||||
wsMQTTConnection.sendFrame(command.encode());
|
||||
|
||||
MQTTFrame received = wsMQTTConnection.receive(15, TimeUnit.SECONDS);
|
||||
if (received == null || received.messageType() != CONNACK.TYPE) {
|
||||
fail("Client did not get expected CONNACK");
|
||||
}
|
||||
|
||||
assertTrue("Connection should open", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
TimeUnit.SECONDS.sleep(35);
|
||||
|
||||
assertTrue("Connection should still be open", getProxyToBroker().getCurrentConnectionsCount() == 1);
|
||||
|
||||
wsMQTTConnection.disconnect();
|
||||
wsMQTTConnection.close();
|
||||
|
||||
done.set(true);
|
||||
|
||||
assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToBroker().getCurrentConnectionsCount() == 0;
|
||||
}
|
||||
}));
|
||||
|
||||
assertTrue("Client Connection should close", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return !wsMQTTConnection.isConnected();
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -341,4 +341,26 @@ public class StompWSTransportTest extends WSTransportTestSupport {
|
|||
LOG.info("Caught exception on write of disconnect", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
|
||||
String connectFrame = "STOMP\n" +
|
||||
"login:system\n" +
|
||||
"passcode:manager\n" +
|
||||
"accept-version:1.1\n" +
|
||||
"heart-beat:60000,0\n" +
|
||||
"host:localhost\n" +
|
||||
"\n" + Stomp.NULL;
|
||||
|
||||
wsStompConnection.sendRawFrame(connectFrame);
|
||||
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
|
||||
assertTrue(incoming.startsWith("CONNECTED"));
|
||||
assertTrue(incoming.indexOf("version:1.1") >= 0);
|
||||
assertTrue(incoming.indexOf("heart-beat:") >= 0);
|
||||
assertTrue(incoming.indexOf("session:") >= 0);
|
||||
|
||||
TimeUnit.SECONDS.sleep(35);
|
||||
|
||||
wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue