diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java index 8b7c4c8813d..461c41c57d0 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java @@ -422,10 +422,6 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab { if (!demanding) throw new IllegalStateException("FrameHandler is not demanding: " + this); - - if (!sessionState.isInputOpen()) - throw new IllegalStateException("FrameHandler input not open: " + this); - connection.demand(n); } diff --git a/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java new file mode 100644 index 00000000000..0f50b666e0e --- /dev/null +++ b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java @@ -0,0 +1,148 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.websocket.core; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; +import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; +import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DemandTest +{ + Server _server; + ServerConnector _connector; + WebSocketCoreClient _client; + + @BeforeEach + public void before() throws Exception + { + _server = new Server(); + _connector = new ServerConnector(_server); + _server.addConnector(_connector); + + WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(); + _server.setHandler(upgradeHandler); + upgradeHandler.addMapping("/", WebSocketNegotiator.from((neg) -> new EchoFrameHandler())); + _server.start(); + + _client = new WebSocketCoreClient(); + _client.start(); + } + + @AfterEach + public void after() throws Exception + { + _client.stop(); + _server.stop(); + } + + public static class AbstractFrameHandler implements FrameHandler + { + protected CoreSession _coreSession; + + @Override + public void onOpen(CoreSession coreSession, Callback callback) + { + _coreSession = coreSession; + callback.succeeded(); + coreSession.demand(1); + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + callback.succeeded(); + _coreSession.demand(1); + } + + @Override + public void onError(Throwable cause, Callback callback) + { + callback.succeeded(); + _coreSession.demand(1); + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + callback.succeeded(); + } + + @Override + public boolean isDemanding() + { + return true; + } + } + + @Test + public void testDemandAfterClose() throws Exception + { + CountDownLatch closed = new CountDownLatch(1); + CompletableFuture errorFuture = new CompletableFuture<>(); + AbstractFrameHandler frameHandler = new AbstractFrameHandler() + { + @Override + public void onFrame(Frame frame, Callback callback) + { + try + { + // Fail the core session so it is completely closed. + FutureCallback futureCallback = new FutureCallback(); + _coreSession.close(CloseStatus.BAD_PAYLOAD, "bad data", futureCallback); + futureCallback.block(); + _coreSession.abort(); + + // Demand should not throw even if closed. + _coreSession.demand(1); + errorFuture.complete(null); + } + catch (Throwable t) + { + errorFuture.complete(t); + } + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + super.onClosed(closeStatus, callback); + closed.countDown(); + } + }; + + URI uri = URI.create("ws://localhost:" + _connector.getLocalPort()); + CoreSession coreSession = _client.connect(frameHandler, uri).get(5, TimeUnit.SECONDS); + coreSession.sendFrame(new Frame(OpCode.TEXT, "hello world"), Callback.NOOP, false); + assertTrue(closed.await(5, TimeUnit.SECONDS)); + + // There should be no error from the frame handler. + Throwable error = errorFuture.get(5, TimeUnit.SECONDS); + assertNull(error); + } +} diff --git a/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java index 1d3b137b1e3..a6750c9c240 100644 --- a/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java +++ b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java @@ -33,7 +33,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -157,22 +158,22 @@ public class WebSocketOpenTest extends WebSocketTester Parser.ParsedFrame frame = receiveFrame(client.getInputStream()); assertThat(frame.getPayloadAsUTF8(), is("Hello")); - // But cannot receive - client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); - assertFalse(serverHandler.closeLatch.await(1, TimeUnit.SECONDS)); - - // Can't demand until open - assertThrows(Throwable.class, () -> coreSession.demand(1)); - client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); - assertFalse(serverHandler.closeLatch.await(1, TimeUnit.SECONDS)); - - // Succeeded moves to OPEN state and still does not read CLOSE frame + // Succeeded moves to OPEN state. onOpenCallback.succeeded(); assertThat(coreSession.toString(), containsString("OPEN")); - // Demand start receiving frames + // Demanding in onOpen will allow you to receive frames. + client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.TEXT, "message in onOpen", true)); + assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS)); coreSession.demand(1); + Frame rcvFrame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS); + assertNotNull(rcvFrame); + assertThat(rcvFrame.getPayloadAsUTF8(), is("message in onOpen")); + + // Demand to receive the close frame. client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); + assertFalse(serverHandler.closeLatch.await(1, TimeUnit.SECONDS)); + coreSession.demand(1); assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS)); // Closed handled normally diff --git a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java index 24fb6d45167..0b222e5bcac 100644 --- a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java +++ b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java @@ -584,6 +584,7 @@ public class JavaxWebSocketFrameHandler implements FrameHandler if (activeMessageSink == null) { callback.succeeded(); + coreSession.demand(1); return; } diff --git a/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/SingleMessageHandlerTest.java b/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/SingleMessageHandlerTest.java new file mode 100644 index 00000000000..a97818fae02 --- /dev/null +++ b/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/SingleMessageHandlerTest.java @@ -0,0 +1,139 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.websocket.javax.tests; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import javax.websocket.OnMessage; +import javax.websocket.Session; +import javax.websocket.server.ServerEndpoint; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.javax.client.internal.JavaxWebSocketClientContainer; +import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SingleMessageHandlerTest +{ + private static final LinkedBlockingQueue BINARY_MESSAGES = new LinkedBlockingQueue<>(); + private static final LinkedBlockingQueue TEXT_MESSAGES = new LinkedBlockingQueue<>(); + + private Server _server; + private ServerConnector _connector; + private JavaxWebSocketClientContainer _client; + + @BeforeEach + public void before() throws Exception + { + _server = new Server(); + _connector = new ServerConnector(_server); + _server.addConnector(_connector); + + ServletContextHandler contextHandler = new ServletContextHandler(); + JavaxWebSocketServletContainerInitializer.configure(contextHandler, ((servletContext, serverContainer) -> + { + serverContainer.addEndpoint(TextEndpoint.class); + serverContainer.addEndpoint(BinaryEndpoint.class); + })); + _server.setHandler(contextHandler); + + _server.start(); + _client = new JavaxWebSocketClientContainer(); + _client.start(); + } + + @ServerEndpoint("/binary") + public static class BinaryEndpoint + { + @OnMessage + public void onMessage(ByteBuffer message) + { + BINARY_MESSAGES.add(message); + } + } + + @ServerEndpoint("/text") + public static class TextEndpoint + { + @OnMessage + public void onMessage(String message) + { + TEXT_MESSAGES.add(message); + } + } + + @AfterEach + public void after() throws Exception + { + _client.stop(); + _server.stop(); + } + + @Test + public void testBinary() throws Exception + { + URI uri = URI.create("ws://localhost:" + _connector.getLocalPort() + "/binary"); + EventSocket eventSocket = new EventSocket(); + Session session = _client.connectToServer(eventSocket, uri); + + // Can send/receive binary message successfully. + ByteBuffer binaryMessage = BufferUtil.toBuffer("hello world"); + session.getBasicRemote().sendBinary(binaryMessage); + assertThat(BINARY_MESSAGES.poll(5, TimeUnit.SECONDS), equalTo(binaryMessage)); + + // Text message is discarded by implementation. + session.getBasicRemote().sendText("hello world"); + + // Next binary message is still received. + session.getBasicRemote().sendBinary(binaryMessage); + assertThat(BINARY_MESSAGES.poll(5, TimeUnit.SECONDS), equalTo(binaryMessage)); + + session.close(); + assertTrue(eventSocket.closeLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testText() throws Exception + { + URI uri = URI.create("ws://localhost:" + _connector.getLocalPort() + "/text"); + EventSocket eventSocket = new EventSocket(); + Session session = _client.connectToServer(eventSocket, uri); + + // Can send/receive text message successfully. + String textMessage = "hello world"; + session.getBasicRemote().sendText(textMessage); + assertThat(TEXT_MESSAGES.poll(5, TimeUnit.SECONDS), equalTo(textMessage)); + + // Binary message is discarded by implementation. + session.getBasicRemote().sendBinary(BufferUtil.toBuffer("hello world")); + + // Next text message is still received. + session.getBasicRemote().sendText(textMessage); + assertThat(TEXT_MESSAGES.poll(5, TimeUnit.SECONDS), equalTo(textMessage)); + + session.close(); + assertTrue(eventSocket.closeLatch.await(5, TimeUnit.SECONDS)); + } +}