diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java index f9600cc4f6e..6c578936ed1 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java @@ -101,7 +101,7 @@ public class FragmentExtension extends AbstractExtension implements DemandChain { if (first) { - if (OpCode.isControlFrame(frame.getOpCode())) + if (frame.isControlFrame()) { emitFrame(frame, callback); return true; diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index 4cf2af7565a..fe29910317f 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -43,7 +43,15 @@ import org.slf4j.LoggerFactory; public class FrameFlusher extends IteratingCallback { - public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY); + public static final Frame FLUSH_FRAME = new Frame(OpCode.UNDEFINED) + { + @Override + public boolean isControlFrame() + { + return true; + } + }; + private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class); private static final Throwable CLOSED_CHANNEL = new StaticException("Closed"); diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java index 3430c65697a..f83a667b210 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java @@ -268,7 +268,7 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem @Override protected boolean onFrame(Frame frame, Callback callback, boolean batch) { - if (OpCode.isControlFrame(frame.getOpCode())) + if (frame.isControlFrame()) { nextOutgoingFrame(frame, callback, batch); return true; @@ -373,7 +373,7 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem { if (first) { - if (OpCode.isControlFrame(frame.getOpCode())) + if (frame.isControlFrame()) { emitFrame(frame, callback); return true; diff --git a/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/FlushFrameTest.java b/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/FlushFrameTest.java new file mode 100644 index 00000000000..1083a3f8975 --- /dev/null +++ b/jetty-websocket/websocket-javax-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/FlushFrameTest.java @@ -0,0 +1,184 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.websocket.javax.tests; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.websocket.EndpointConfig; +import javax.websocket.Session; +import javax.websocket.server.ServerEndpoint; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.CoreSession; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.client.CoreClientUpgradeRequest; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; +import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.eclipse.jetty.websocket.core.CloseStatus.NORMAL; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FlushFrameTest +{ + private Server _server; + private ServerConnector _connector; + private WebSocketCoreClient _client; + + @BeforeEach + public void before() throws Exception + { + _server = new Server(); + _connector = new ServerConnector(_server); + _server.addConnector(_connector); + + ServletContextHandler contextHandler = new ServletContextHandler(); + JavaxWebSocketServletContainerInitializer.configure(contextHandler, (context, container) -> + { + container.addEndpoint(WebSocketServerEndpoint.class); + }); + _server.setHandler(contextHandler); + _server.start(); + + _client = new WebSocketCoreClient(); + _client.start(); + } + + @AfterEach + public void after() throws Exception + { + _client.stop(); + _server.stop(); + } + + @ServerEndpoint("/") + public static class WebSocketServerEndpoint extends EventSocket + { + @Override + public void onOpen(Session session, EndpointConfig endpointConfig) + { + super.onOpen(session, endpointConfig); + + try + { + session.getBasicRemote().setBatchingAllowed(true); + session.getBasicRemote().flushBatch(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + + @Test + public void testFlushWithPermessageDeflate() throws Exception + { + URI uri = URI.create("ws://localhost:" + _connector.getLocalPort()); + TestFrameHandler testFrameHandler = new TestFrameHandler(); + CoreClientUpgradeRequest upgradeRequest = CoreClientUpgradeRequest.from(_client, uri, testFrameHandler); + upgradeRequest.addExtensions("permessage-deflate"); + + // Once WebSocket connection is opened the server does a flush. + CoreSession coreSession = _client.connect(upgradeRequest).get(5, TimeUnit.SECONDS); + coreSession.close(NORMAL, null, Callback.NOOP); + + // Receive the close frame and succeed the callback. + TestFrameHandler.FrameCallback received = testFrameHandler.received.poll(5, TimeUnit.SECONDS); + assertNotNull(received); + assertThat(received.frame.getOpCode(), equalTo(OpCode.CLOSE)); + received.callback.succeeded(); + + // FrameHandler is closed normally. + assertTrue(testFrameHandler.closeLatch.await(5, TimeUnit.SECONDS)); + assertNull(testFrameHandler.error); + assertThat(testFrameHandler.closeStatus.getCode(), equalTo(NORMAL)); + + // We received no other frames. + assertTrue(testFrameHandler.received.isEmpty()); + } + + public static class TestFrameHandler implements FrameHandler + { + CoreSession coreSession; + Throwable error; + CloseStatus closeStatus; + CountDownLatch openLatch = new CountDownLatch(1); + CountDownLatch closeLatch = new CountDownLatch(1); + BlockingArrayQueue received = new BlockingArrayQueue<>(); + + public static class FrameCallback + { + public FrameCallback(Frame frame, Callback callback) + { + this.frame = frame; + this.callback = callback; + } + + public Frame frame; + public Callback callback; + } + + @Override + public void onOpen(CoreSession coreSession, Callback callback) + { + this.coreSession = coreSession; + callback.succeeded(); + coreSession.demand(1); + openLatch.countDown(); + } + + @Override + public void onFrame(Frame frame, Callback callback) + { + received.offer(new FrameCallback(frame, callback)); + coreSession.demand(1); + } + + @Override + public void onError(Throwable cause, Callback callback) + { + this.error = cause; + callback.succeeded(); + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + this.closeStatus = closeStatus; + callback.succeeded(); + closeLatch.countDown(); + } + + @Override + public boolean isDemanding() + { + return true; + } + } +}