Fix incorrect reliance on autoDemand when handlers are not configured (#11304)

#11303 fix incorrect reliance on autoDemand when handlers are not configured

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2024-01-24 09:11:25 +01:00 committed by GitHub
parent 676d76c1ff
commit aa3873fb2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 120 additions and 12 deletions

View File

@ -145,15 +145,16 @@ public class JettyWebSocketFrameHandler implements FrameHandler
container.notifySessionListeners((listener) -> listener.onWebSocketSessionOpened(session));
callback.succeeded();
if (openHandle != null)
autoDemand();
else
internalDemand();
}
catch (Throwable cause)
{
callback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " OPEN method error: " + cause.getMessage(), cause));
}
finally
{
autoDemand();
}
}
private static MessageSink createMessageSink(Class<? extends MessageSink> sinkClass, WebSocketSession session, MethodHandle msgHandle, boolean autoDemanding)
@ -320,7 +321,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
public void succeed()
{
callback.succeeded();
autoDemand();
internalDemand();
}
@Override
@ -328,6 +329,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
{
// Ignore failures, we might be output closed and receive a PING.
callback.succeeded();
internalDemand();
}
});
}
@ -355,7 +357,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
}
else
{
autoDemand();
internalDemand();
}
}
@ -384,7 +386,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
if (activeMessageSink == null)
{
callback.succeeded();
autoDemand();
internalDemand();
return;
}
@ -403,7 +405,12 @@ public class JettyWebSocketFrameHandler implements FrameHandler
private void autoDemand()
{
if (isAutoDemand())
session.getCoreSession().demand();
internalDemand();
}
private void internalDemand()
{
session.getCoreSession().demand();
}
public String toString()

View File

@ -17,6 +17,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-java-server</artifactId>

View File

@ -15,12 +15,17 @@ package org.eclipse.jetty.websocket.tests;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Frame;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
@ -29,6 +34,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -55,9 +61,45 @@ public class ExplicitDemandTest
}
}
@WebSocket(autoDemand = false)
public static class ListenerSocket implements Session.Listener
{
final List<Frame> frames = new CopyOnWriteArrayList<>();
@Override
public void onWebSocketFrame(Frame frame, Callback callback)
{
frames.add(frame);
callback.succeed();
}
}
@WebSocket(autoDemand = false)
public static class PingSocket extends ListenerSocket
{
Session session;
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
session.demand();
}
@Override
public void onWebSocketFrame(Frame frame, Callback callback)
{
super.onWebSocketFrame(frame, callback);
if (frame.getType() == Frame.Type.TEXT)
session.sendPing(ByteBuffer.wrap("server-ping".getBytes(StandardCharsets.UTF_8)), Callback.NOOP);
}
}
private final Server server = new Server();
private final WebSocketClient client = new WebSocketClient();
private final SuspendSocket serverSocket = new SuspendSocket();
private final ListenerSocket listenerSocket = new ListenerSocket();
private final PingSocket pingSocket = new PingSocket();
private ServerConnector connector;
@BeforeEach
@ -67,7 +109,11 @@ public class ExplicitDemandTest
server.addConnector(connector);
WebSocketUpgradeHandler wsHandler = WebSocketUpgradeHandler.from(server, container ->
container.addMapping("/suspend", (rq, rs, cb) -> serverSocket));
{
container.addMapping("/suspend", (rq, rs, cb) -> serverSocket);
container.addMapping("/listenerSocket", (rq, rs, cb) -> listenerSocket);
container.addMapping("/ping", (rq, rs, cb) -> pingSocket);
});
server.setHandler(wsHandler);
server.start();
@ -114,4 +160,57 @@ public class ExplicitDemandTest
assertNull(clientSocket.error);
assertNull(serverSocket.error);
}
@Test
public void testNoAutoDemand() throws Exception
{
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/listenerSocket");
ListenerSocket listenerSocket = new ListenerSocket();
Future<Session> connect = client.connect(listenerSocket, uri);
Session session = connect.get(5, TimeUnit.SECONDS);
session.sendPing(ByteBuffer.wrap("ping-0".getBytes(StandardCharsets.UTF_8)), Callback.NOOP);
session.sendText("test-text", Callback.NOOP);
session.sendPing(ByteBuffer.wrap("ping-1".getBytes(StandardCharsets.UTF_8)), Callback.NOOP);
await().atMost(5, TimeUnit.SECONDS).until(listenerSocket.frames::size, is(2));
Frame frame0 = listenerSocket.frames.get(0);
assertThat(frame0.getType(), is(Frame.Type.PONG));
assertThat(StandardCharsets.UTF_8.decode(frame0.getPayload()).toString(), is("ping-0"));
Frame frame1 = listenerSocket.frames.get(1);
assertThat(frame1.getType(), is(Frame.Type.PONG));
assertThat(StandardCharsets.UTF_8.decode(frame1.getPayload()).toString(), is("ping-1"));
session.close();
await().atMost(5, TimeUnit.SECONDS).until(listenerSocket.frames::size, is(3));
assertThat(listenerSocket.frames.get(2).getType(), is(Frame.Type.CLOSE));
}
@Test
public void testServerPing() throws Exception
{
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/ping");
PingSocket pingSocket = new PingSocket();
Future<Session> connect = client.connect(pingSocket, uri);
Session session = connect.get(5, TimeUnit.SECONDS);
session.sendText("send-me-a-ping", Callback.NOOP);
await().atMost(5, TimeUnit.SECONDS).until(pingSocket.frames::size, is(1));
Frame frame = pingSocket.frames.get(0);
assertThat(frame.getType(), is(Frame.Type.PING));
assertThat(StandardCharsets.UTF_8.decode(frame.getPayload()).toString(), is("server-ping"));
session.sendText("send-me-another-ping", Callback.NOOP);
await().atMost(5, TimeUnit.SECONDS).until(pingSocket.frames::size, is(2));
frame = pingSocket.frames.get(1);
assertThat(frame.getType(), is(Frame.Type.PING));
assertThat(StandardCharsets.UTF_8.decode(frame.getPayload()).toString(), is("server-ping"));
session.close();
await().atMost(5, TimeUnit.SECONDS).until(pingSocket.frames::size, is(3));
frame = pingSocket.frames.get(2);
assertThat(frame.getType(), is(Frame.Type.CLOSE));
}
}

View File

@ -129,14 +129,12 @@ public class FrameListenerTest
public static class FrameEndpoint implements Session.Listener
{
public Session session;
public CountDownLatch closeLatch = new CountDownLatch(1);
public LinkedBlockingQueue<String> frameEvents = new LinkedBlockingQueue<>();
@Override
public void onWebSocketOpen(Session session)
{
this.session = session;
session.demand();
}
@ -149,7 +147,6 @@ public class FrameListenerTest
BufferUtil.toUTF8String(frame.getPayload()),
frame.getPayloadLength()));
callback.succeed();
session.demand();
}
@Override