From f7b212abb00eccdca03e54339e87bfe2d4f7664d Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Wed, 17 Jul 2019 14:34:51 -0500 Subject: [PATCH 1/5] Issue #3884 - Pure WebSocketFrameListener should not process Continuation events Signed-off-by: Joakim Erdfelt --- .../tests/server/FrameListenerTest.java | 202 ++++++++++++++++++ .../events/JettyListenerEventDriver.java | 9 + 2 files changed, 211 insertions(+) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java new file mode 100644 index 00000000000..b6587f7f3b0 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java @@ -0,0 +1,202 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketFrameListener; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class FrameListenerTest +{ + private Server server; + private FrameCreator frameCreator; + private WebSocketClient client; + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + + ServletHolder closeEndpoint = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.getPolicy().setIdleTimeout(SECONDS.toMillis(2)); + frameCreator = new FrameCreator(); + factory.setCreator(frameCreator); + } + }); + context.addServlet(closeEndpoint, "/ws"); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @BeforeEach + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.start(); + } + + @AfterEach + public void stopClient() throws Exception + { + client.stop(); + } + + private void close(Session session) + { + if (session != null) + { + session.close(); + } + } + + @Test + public void testPartialText() throws Exception + { + ClientUpgradeRequest request = new ClientUpgradeRequest(); + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future futSession = client.connect(clientEndpoint, wsUri, request); + + Session session = null; + try (StacklessLogging ignore = new StacklessLogging(WebSocketSession.class)) + { + session = futSession.get(5, SECONDS); + + RemoteEndpoint clientRemote = session.getRemote(); + clientRemote.sendPartialString("hello", false); + clientRemote.sendPartialString(" ", false); + clientRemote.sendPartialString("world", true); + + FrameEndpoint serverEndpoint = frameCreator.frameEndpoint; + + String event = serverEndpoint.frameEvents.poll(5, SECONDS); + assertThat("Event", event, is("FRAME[TEXT,fin=false,payload=hello,len=5]")); + event = serverEndpoint.frameEvents.poll(5, SECONDS); + assertThat("Event", event, is("FRAME[CONTINUATION,fin=false,payload= ,len=1]")); + event = serverEndpoint.frameEvents.poll(5, SECONDS); + assertThat("Event", event, is("FRAME[CONTINUATION,fin=true,payload=world,len=5]")); + } + finally + { + close(session); + } + } + + public static class FrameCreator implements WebSocketCreator + { + public FrameEndpoint frameEndpoint; + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + frameEndpoint = new FrameEndpoint(); + return frameEndpoint; + } + } + + public static class FrameEndpoint implements WebSocketFrameListener + { + public Session session; + public CountDownLatch closeLatch = new CountDownLatch(1); + public LinkedBlockingQueue frameEvents = new LinkedBlockingQueue<>(); + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + closeLatch.countDown(); + } + + @Override + public void onWebSocketConnect(Session session) + { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable cause) + { + cause.printStackTrace(System.err); + } + + @Override + public void onWebSocketFrame(Frame frame) + { + frameEvents.offer(String.format("FRAME[%s,fin=%b,payload=%s,len=%d]", + OpCode.name(frame.getOpCode()), + frame.isFin(), + BufferUtil.toUTF8String(frame.getPayload()), + frame.getPayloadLength())); + } + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java index 244460fadf8..8557edf2ca1 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java @@ -190,6 +190,15 @@ public class JettyListenerEventDriver extends AbstractEventDriver } } + @Override + public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException + { + if (listener instanceof WebSocketListener) + { + super.onContinuationFrame(buffer, fin); + } + } + @Override public String toString() { From f4ce98b1c80f1b78367ddbba59da519ca645f81c Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Wed, 17 Jul 2019 15:51:35 -0500 Subject: [PATCH 2/5] Issue #3884 - Pure @OnWebSocketFrame usage should not process Continuation events Signed-off-by: Joakim Erdfelt --- .../tests/server/FrameAnnotationTest.java | 207 ++++++++++++++++++ .../events/JettyAnnotatedEventDriver.java | 9 + 2 files changed, 216 insertions(+) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameAnnotationTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameAnnotationTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameAnnotationTest.java new file mode 100644 index 00000000000..4857134dbd0 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameAnnotationTest.java @@ -0,0 +1,207 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class FrameAnnotationTest +{ + private Server server; + private FrameCreator frameCreator; + private WebSocketClient client; + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + + ServletHolder closeEndpoint = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.getPolicy().setIdleTimeout(SECONDS.toMillis(2)); + frameCreator = new FrameCreator(); + factory.setCreator(frameCreator); + } + }); + context.addServlet(closeEndpoint, "/ws"); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @BeforeEach + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.start(); + } + + @AfterEach + public void stopClient() throws Exception + { + client.stop(); + } + + private void close(Session session) + { + if (session != null) + { + session.close(); + } + } + + @Test + public void testPartialText() throws Exception + { + ClientUpgradeRequest request = new ClientUpgradeRequest(); + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future futSession = client.connect(clientEndpoint, wsUri, request); + + Session session = null; + try (StacklessLogging ignore = new StacklessLogging(WebSocketSession.class)) + { + session = futSession.get(5, SECONDS); + + RemoteEndpoint clientRemote = session.getRemote(); + clientRemote.sendPartialString("hello", false); + clientRemote.sendPartialString(" ", false); + clientRemote.sendPartialString("world", true); + + FrameEndpoint serverEndpoint = frameCreator.frameEndpoint; + + String event = serverEndpoint.frameEvents.poll(5, SECONDS); + assertThat("Event", event, is("FRAME[TEXT,fin=false,payload=hello,len=5]")); + event = serverEndpoint.frameEvents.poll(5, SECONDS); + assertThat("Event", event, is("FRAME[CONTINUATION,fin=false,payload= ,len=1]")); + event = serverEndpoint.frameEvents.poll(5, SECONDS); + assertThat("Event", event, is("FRAME[CONTINUATION,fin=true,payload=world,len=5]")); + } + finally + { + close(session); + } + } + + public static class FrameCreator implements WebSocketCreator + { + public FrameEndpoint frameEndpoint; + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + frameEndpoint = new FrameEndpoint(); + return frameEndpoint; + } + } + + @WebSocket + public static class FrameEndpoint + { + public Session session; + public CountDownLatch closeLatch = new CountDownLatch(1); + public LinkedBlockingQueue frameEvents = new LinkedBlockingQueue<>(); + + @OnWebSocketClose + public void onWebSocketClose(int statusCode, String reason) + { + closeLatch.countDown(); + } + + @OnWebSocketConnect + public void onWebSocketConnect(Session session) + { + this.session = session; + } + + @OnWebSocketError + public void onWebSocketError(Throwable cause) + { + cause.printStackTrace(System.err); + } + + @OnWebSocketFrame + public void onWebSocketFrame(Frame frame) + { + frameEvents.offer(String.format("FRAME[%s,fin=%b,payload=%s,len=%d]", + OpCode.name(frame.getOpCode()), + frame.isFin(), + BufferUtil.toUTF8String(frame.getPayload()), + frame.getPayloadLength())); + } + } +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java index b383132b567..b3c53794f9d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java @@ -152,6 +152,15 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver } } + @Override + public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException + { + if (events.onText != null || events.onBinary != null) + { + super.onContinuationFrame(buffer, fin); + } + } + @Override public void onError(Throwable cause) { From 02c49efc5f5c9768ba5ad15349fe9565ba7333e0 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 18 Jul 2019 07:51:12 -0500 Subject: [PATCH 3/5] Issue #3884 - Adding testcase to satisfy PR review Signed-off-by: Joakim Erdfelt --- .../tests/client/WebSocketClientTest.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java index 3b843c0e98f..29640bd53fe 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java @@ -171,6 +171,39 @@ public class WebSocketClientTest } } + @Test + public void testBasicEcho_PartialUsage_FromClient() throws Exception + { + CloseTrackingEndpoint cliSock = new CloseTrackingEndpoint(); + + client.getPolicy().setIdleTimeout(10000); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/echo")); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("echo"); + Future future = client.connect(cliSock, wsUri, request); + + try (Session sess = future.get(30, TimeUnit.SECONDS)) + { + assertThat("Session", sess, notNullValue()); + assertThat("Session.open", sess.isOpen(), is(true)); + assertThat("Session.upgradeRequest", sess.getUpgradeRequest(), notNullValue()); + assertThat("Session.upgradeResponse", sess.getUpgradeResponse(), notNullValue()); + + Collection sessions = client.getOpenSessions(); + assertThat("client.sessions.size", sessions.size(), is(1)); + + RemoteEndpoint remote = cliSock.getSession().getRemote(); + remote.sendPartialString("Hello", false); + remote.sendPartialString(" ", false); + remote.sendPartialString("World", true); + + // wait for response from server + String received = cliSock.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Message", received, containsString("Hello World")); + } + } + @Test public void testBasicEcho_UsingCallback() throws Exception { From f36358c631cf24fcf2797eec4ee33760b76fa60d Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 18 Jul 2019 08:03:16 -0500 Subject: [PATCH 4/5] Issue #3884 - Adding NullMessage to satisfy PR review + If an annotated Jetty WebSocket Endpoint doesn't have a handler for data types TEXT or BINARY then the new NullMessage (sink) is used for that specific data type to consume (quietly) those ignored Messages. Signed-off-by: Joakim Erdfelt --- .../tests/client/WebSocketClientTest.java | 38 +++++++++++++++++++ .../events/JettyAnnotatedEventDriver.java | 19 +++++----- .../websocket/common/message/NullMessage.java | 38 +++++++++++++++++++ 3 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/NullMessage.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java index 29640bd53fe..611a06e6c55 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java @@ -33,6 +33,7 @@ import javax.servlet.DispatcherType; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; @@ -204,6 +205,43 @@ public class WebSocketClientTest } } + @Test + public void testBasicEcho_PartialText_WithPartialBinary_FromClient() throws Exception + { + CloseTrackingEndpoint cliSock = new CloseTrackingEndpoint(); + + client.getPolicy().setIdleTimeout(10000); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/echo")); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("echo"); + Future future = client.connect(cliSock, wsUri, request); + + try (Session sess = future.get(30, TimeUnit.SECONDS)) + { + assertThat("Session", sess, notNullValue()); + assertThat("Session.open", sess.isOpen(), is(true)); + assertThat("Session.upgradeRequest", sess.getUpgradeRequest(), notNullValue()); + assertThat("Session.upgradeResponse", sess.getUpgradeResponse(), notNullValue()); + + Collection sessions = client.getOpenSessions(); + assertThat("client.sessions.size", sessions.size(), is(1)); + + RemoteEndpoint remote = cliSock.getSession().getRemote(); + remote.sendPartialString("Hello", false); + remote.sendPartialString(" ", false); + remote.sendPartialString("World", true); + + remote.sendPartialBytes(BufferUtil.toBuffer("It's a big enough umbrella, "), false); + remote.sendPartialBytes(BufferUtil.toBuffer("but it's always me that "), false); + remote.sendPartialBytes(BufferUtil.toBuffer("ends up getting wet."), true); + + // wait for response from server + String received = cliSock.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Message", received, containsString("Hello World")); + } + } + @Test public void testBasicEcho_UsingCallback() throws Exception { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java index b3c53794f9d..6b1a9b2b650 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyAnnotatedEventDriver.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.message.MessageAppender; import org.eclipse.jetty.websocket.common.message.MessageInputStream; import org.eclipse.jetty.websocket.common.message.MessageReader; +import org.eclipse.jetty.websocket.common.message.NullMessage; import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage; import org.eclipse.jetty.websocket.common.message.SimpleTextMessage; @@ -84,6 +85,11 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver if (events.onBinary == null) { // not interested in binary events + if (activeMessage == null) + { + activeMessage = NullMessage.INSTANCE; + } + return; } @@ -152,15 +158,6 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver } } - @Override - public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException - { - if (events.onText != null || events.onBinary != null) - { - super.onContinuationFrame(buffer, fin); - } - } - @Override public void onError(Throwable cause) { @@ -207,6 +204,10 @@ public class JettyAnnotatedEventDriver extends AbstractEventDriver if (events.onText == null) { // not interested in text events + if (activeMessage == null) + { + activeMessage = NullMessage.INSTANCE; + } return; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/NullMessage.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/NullMessage.java new file mode 100644 index 00000000000..0146c90b867 --- /dev/null +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/NullMessage.java @@ -0,0 +1,38 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.message; + +import java.nio.ByteBuffer; + +public class NullMessage implements MessageAppender +{ + public static final MessageAppender INSTANCE = new NullMessage(); + + @Override + public void appendFrame(ByteBuffer framePayload, boolean isLast) + { + // consume payload + framePayload.position(framePayload.limit()); + } + + @Override + public void messageComplete() + { + } +} From b8f29630eddd9575ad142dc9821c54a2d26f16c0 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Mon, 5 Aug 2019 12:52:20 -0500 Subject: [PATCH 5/5] Issue #3884 - Updating tests and fixing merge + Test cases have been updated based on PR review + Fixing merge from `jetty-9.4.x` that caused a duplicate JettyListenerEventDriver.onContinuationFrame() method Signed-off-by: Joakim Erdfelt --- .../tests/CloseTrackingEndpoint.java | 9 ++++++++ .../jetty/websocket/tests/EchoSocket.java | 7 ++++++ .../tests/client/WebSocketClientTest.java | 17 +++++++++++--- .../tests/server/FrameListenerTest.java | 23 +++---------------- .../events/JettyListenerEventDriver.java | 6 ----- 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java index 43829920e37..b8a126d4c26 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.websocket.tests; import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -51,6 +52,7 @@ public class CloseTrackingEndpoint extends WebSocketAdapter public CountDownLatch errorLatch = new CountDownLatch(1); public LinkedBlockingQueue messageQueue = new LinkedBlockingQueue<>(); + public LinkedBlockingQueue binaryMessageQueue = new LinkedBlockingQueue<>(); public AtomicReference error = new AtomicReference<>(); public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher statusCodeMatcher) @@ -110,6 +112,13 @@ public class CloseTrackingEndpoint extends WebSocketAdapter messageQueue.offer(message); } + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) + { + LOG.debug("onWebSocketBinary({},offset,len)", payload, offset, len); + binaryMessageQueue.offer(ByteBuffer.wrap(payload, offset, len)); + } + public EndPoint getEndPoint() throws Exception { Session session = getSession(); diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoSocket.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoSocket.java index ce2d9f651af..a0b8ee62c02 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoSocket.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoSocket.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.websocket.tests; import java.io.IOException; +import java.nio.ByteBuffer; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; @@ -33,4 +34,10 @@ public class EchoSocket { session.getRemote().sendString(msg); } + + @OnWebSocketMessage + public void onBinaryMessage(Session session, byte[] data, int offset, int len) throws IOException + { + session.getRemote().sendBytes(ByteBuffer.wrap(data, offset, len)); + } } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java index 611a06e6c55..7863c93744a 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/WebSocketClientTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.tests.client; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -232,13 +233,23 @@ public class WebSocketClientTest remote.sendPartialString(" ", false); remote.sendPartialString("World", true); - remote.sendPartialBytes(BufferUtil.toBuffer("It's a big enough umbrella, "), false); - remote.sendPartialBytes(BufferUtil.toBuffer("but it's always me that "), false); - remote.sendPartialBytes(BufferUtil.toBuffer("ends up getting wet."), true); + String[] parts = { + "The difference between the right word ", + "and the almost right word is the difference ", + "between lightning and a lightning bug." + }; + + remote.sendPartialBytes(BufferUtil.toBuffer(parts[0]), false); + remote.sendPartialBytes(BufferUtil.toBuffer(parts[1]), false); + remote.sendPartialBytes(BufferUtil.toBuffer(parts[2]), true); // wait for response from server String received = cliSock.messageQueue.poll(5, TimeUnit.SECONDS); assertThat("Message", received, containsString("Hello World")); + + ByteBuffer bufReceived = cliSock.binaryMessageQueue.poll(5, TimeUnit.SECONDS); + received = BufferUtil.toUTF8String(bufReceived.slice()); + assertThat("Message", received, containsString(parts[0] + parts[1] + parts[2])); } } diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java index b6587f7f3b0..1aa131d54b9 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java @@ -40,9 +40,6 @@ import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.WebSocketSession; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; -import org.eclipse.jetty.websocket.servlet.WebSocketCreator; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; @@ -57,7 +54,7 @@ import static org.hamcrest.Matchers.is; public class FrameListenerTest { private Server server; - private FrameCreator frameCreator; + private FrameEndpoint serverEndpoint; private WebSocketClient client; @BeforeEach @@ -77,8 +74,8 @@ public class FrameListenerTest public void configure(WebSocketServletFactory factory) { factory.getPolicy().setIdleTimeout(SECONDS.toMillis(2)); - frameCreator = new FrameCreator(); - factory.setCreator(frameCreator); + serverEndpoint = new FrameEndpoint(); + factory.setCreator((req, resp) -> serverEndpoint); } }); context.addServlet(closeEndpoint, "/ws"); @@ -138,8 +135,6 @@ public class FrameListenerTest clientRemote.sendPartialString(" ", false); clientRemote.sendPartialString("world", true); - FrameEndpoint serverEndpoint = frameCreator.frameEndpoint; - String event = serverEndpoint.frameEvents.poll(5, SECONDS); assertThat("Event", event, is("FRAME[TEXT,fin=false,payload=hello,len=5]")); event = serverEndpoint.frameEvents.poll(5, SECONDS); @@ -153,18 +148,6 @@ public class FrameListenerTest } } - public static class FrameCreator implements WebSocketCreator - { - public FrameEndpoint frameEndpoint; - - @Override - public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) - { - frameEndpoint = new FrameEndpoint(); - return frameEndpoint; - } - } - public static class FrameEndpoint implements WebSocketFrameListener { public Session session; diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java index 1d86a5a2ad5..1f4fa3b2e62 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/JettyListenerEventDriver.java @@ -250,12 +250,6 @@ public class JettyListenerEventDriver extends AbstractEventDriver return; } - super.onContinuationFrame(buffer, fin); - } - - @Override - public void onContinuationFrame(ByteBuffer buffer, boolean fin) throws IOException - { if (listener instanceof WebSocketListener) { super.onContinuationFrame(buffer, fin);