From f3f13c4ca9a8a8c899c648d06abd745c97bf93c8 Mon Sep 17 00:00:00 2001 From: Lachlan Date: Wed, 3 Jul 2019 11:09:34 +1000 Subject: [PATCH] Issue #2061 - fail current entry in CompressExtension on failure (#3830) * Issue #2061 - fail current entry in CompressExtension on failure Signed-off-by: Lachlan Roberts * Issue #2061 - do not notify callback failure twice Signed-off-by: Lachlan Roberts --- .../websocket/tests/WriteAfterStopTest.java | 115 ++++++++++++++++++ .../compress/CompressExtension.java | 6 +- 2 files changed, 118 insertions(+), 3 deletions(-) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WriteAfterStopTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WriteAfterStopTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WriteAfterStopTest.java new file mode 100644 index 00000000000..211efcddfa5 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WriteAfterStopTest.java @@ -0,0 +1,115 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class WriteAfterStopTest +{ + + public class UpgradeServlet extends WebSocketServlet + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setCreator(((req, resp) -> serverSocket)); + } + } + + private Server server = new Server(); + private WebSocketClient client = new WebSocketClient(); + private EventSocket serverSocket = new EventSocket(); + private ServerConnector connector; + + @BeforeEach + public void start() throws Exception + { + connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); + contextHandler.setContextPath("/"); + contextHandler.addServlet(new ServletHolder(new UpgradeServlet()), "/"); + server.setHandler(contextHandler); + + server.start(); + client.start(); + } + + @AfterEach + public void stop() throws Exception + { + client.stop(); + server.stop(); + } + + @Test + public void test() throws Exception + { + URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/"); + EventSocket clientSocket = new EventSocket(); + + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.addExtensions("permessage-deflate"); + Session session = client.connect(clientSocket, uri, upgradeRequest).get(5, TimeUnit.SECONDS); + clientSocket.getSession().getRemote().sendStringByFuture("init deflater"); + assertThat(serverSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("init deflater")); + session.close(StatusCode.NORMAL, null); + + // make sure both sides are closed + clientSocket.session.close(); + assertTrue(clientSocket.closed.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.closed.await(5, TimeUnit.SECONDS)); + + // check we closed normally + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); + + ((WebSocketSession)session).stop(); + + try (StacklessLogging stacklessLogging = new StacklessLogging(CompressExtension.class)) + { + assertThrows(NullPointerException.class, + () -> session.getRemote().sendString("hello world")); + } + } +} \ No newline at end of file diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 8d1c1bb8bb3..129fe6220ca 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -429,6 +429,9 @@ public abstract class CompressExtension extends AbstractExtension @Override public void failed(Throwable x) { + notifyCallbackFailure(current.callback, x); + // If something went wrong, very likely the compression context + // will be invalid, so we need to fail this IteratingCallback. LOG.warn(x); super.failed(x); } @@ -596,9 +599,6 @@ public abstract class CompressExtension extends AbstractExtension @Override public void writeFailed(Throwable x) { - notifyCallbackFailure(current.callback, x); - // If something went wrong, very likely the compression context - // will be invalid, so we need to fail this IteratingCallback. failed(x); } }