From 61fc95aa36bdb92cd24abeb315cec4c4e42b438d Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 13 Jun 2017 09:47:44 -0700 Subject: [PATCH] Issue #1599 - WebSocketClient early close scenarios + Connection timeout results in: endpoint.onError(WebSocketTimeoutException) localSession.close(SHUTDOWN) --- .../websocket/common/WebSocketSession.java | 5 + .../io/AbstractWebSocketConnection.java | 101 +++--- .../tests/AbstractTrackingEndpoint.java | 7 + .../tests/client/ClientEarlyCloseTest.java | 291 ++++++++++++++++ .../tests/server/WebSocketCloseTest.java | 328 ------------------ .../test/resources/jetty-logging.properties | 2 + 6 files changed, 361 insertions(+), 373 deletions(-) create mode 100644 jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientEarlyCloseTest.java delete mode 100644 jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/WebSocketCloseTest.java diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 808c9285b51..3bf3d375acd 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -59,6 +59,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.WebSocketTimeoutException; import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; @@ -697,6 +698,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem close(ce.getStatusCode(), ce.getMessage(), callback); } + else if (cause instanceof WebSocketTimeoutException) + { + close(StatusCode.SHUTDOWN, cause.getMessage(), onDisconnectCallback); + } else { LOG.warn("Unhandled Error (closing connection)", cause); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index ffbb16409ba..d42c77a46f0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -42,6 +42,7 @@ import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.FrameCallback; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.WebSocketTimeoutException; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.common.Generator; @@ -72,7 +73,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload) */ private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH; - + private final Logger LOG; private final ByteBufferPool bufferPool; private final Generator generator; @@ -86,18 +87,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp private final List listeners = new CopyOnWriteArrayList<>(); private List extensions; private ByteBuffer networkBuffer; - + public AbstractWebSocketConnection(EndPoint endp, Executor executor, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack) { super(endp,executor); - + Objects.requireNonNull(endp, "EndPoint"); Objects.requireNonNull(executor, "Executor"); Objects.requireNonNull(policy, "WebSocketPolicy"); Objects.requireNonNull(bufferPool, "ByteBufferPool"); - + LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "." + policy.getBehavior()); - + this.id = String.format("%s:%d->%s:%d", endp.getLocalAddress().getAddress().getHostAddress(), endp.getLocalAddress().getPort(), @@ -106,7 +107,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp this.policy = policy; this.bufferPool = bufferPool; this.extensionStack = extensionStack; - + this.generator = new Generator(policy,bufferPool); this.parser = new Parser(policy,bufferPool,this); this.extensions = new ArrayList<>(); @@ -114,12 +115,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp this.flusher = new Flusher(policy.getOutputBufferSize(),generator,endp); this.setInputBufferSize(policy.getInputBufferSize()); this.setMaxIdleTimeout(policy.getIdleTimeout()); - + this.extensionStack.setPolicy(this.policy); this.extensionStack.configure(this.parser); this.extensionStack.configure(this.generator); } - + @Override public Executor getExecutor() { @@ -131,14 +132,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { if (LOG.isDebugEnabled()) LOG.debug("disconnect()"); - + // close FrameFlusher, we cannot write anymore at this point. flusher.close(); - + closed.set(true); close(); } - + @Override public ByteBufferPool getBufferPool() { @@ -184,12 +185,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { return parser; } - + public WebSocketPolicy getPolicy() { return policy; } - + @Override public InetSocketAddress getRemoteAddress() { @@ -214,21 +215,31 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { if (LOG.isDebugEnabled()) LOG.debug("onClose()"); - + closed.set(true); - + flusher.close(); super.onClose(); } + @Override + public boolean onIdleExpired() + { + if (LOG.isDebugEnabled()) + LOG.debug("onIdleExpired()"); + + notifyError(new WebSocketTimeoutException("Connection Idle Timeout")); + return true; + } + @Override public boolean onFrame(Frame frame) { AtomicBoolean result = new AtomicBoolean(false); - + if(LOG.isDebugEnabled()) LOG.debug("onFrame({})", frame); - + extensionStack.incomingFrame(frame, new FrameCallback() { @Override @@ -236,7 +247,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { if(LOG.isDebugEnabled()) LOG.debug("onFrame({}).succeed()", frame); - + parser.release(frame); if(!result.compareAndSet(false,true)) { @@ -244,28 +255,28 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp fillAndParse(); } } - + @Override public void fail(Throwable cause) { if(LOG.isDebugEnabled()) LOG.debug("onFrame("+ frame + ").fail()", cause); parser.release(frame); - + // notify session & endpoint notifyError(cause); } }); - + if(result.compareAndSet(false, true)) { // callback hasn't been notified yet return false; } - + return true; } - + private ByteBuffer getNetworkBuffer() { synchronized (this) @@ -277,7 +288,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return networkBuffer; } } - + private void releaseNetworkBuffer(ByteBuffer buffer) { synchronized (this) @@ -287,14 +298,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp networkBuffer = null; } } - + @Override public void onFillable() { getNetworkBuffer(); fillAndParse(); } - + private void fillAndParse() { try @@ -305,25 +316,25 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { return; } - + ByteBuffer nBuffer = getNetworkBuffer(); - + if (!parser.parse(nBuffer)) return; - + // Shouldn't reach this point if buffer has un-parsed bytes assert(!nBuffer.hasRemaining()); - + int filled = getEndPoint().fill(nBuffer); - + if(LOG.isDebugEnabled()) LOG.debug("endpointFill() filled={}: {}", filled, BufferUtil.toDetailString(nBuffer)); - + if (filled < 0) { releaseNetworkBuffer(nBuffer); return; } - + if (filled == 0) { releaseNetworkBuffer(nBuffer); @@ -337,8 +348,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp notifyError(t); } } - - + + /** * Extra bytes from the initial HTTP upgrade that need to * be processed by the websocket parser before starting @@ -351,7 +362,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled)); } - + if ((prefilled != null) && (prefilled.hasRemaining())) { networkBuffer = bufferPool.acquire(prefilled.remaining(), true); @@ -367,7 +378,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { LOG.warn("Unhandled Connection Error", cause); } - + for (LogicalConnection.Listener listener : listeners) { try @@ -381,7 +392,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } } - + /** * Physical connection Open. */ @@ -416,7 +427,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp flusher.enqueue(frame,callback,batchMode); } - + @Override public void resume() { @@ -425,19 +436,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp fillAndParse(); } } - + public boolean addListener(LogicalConnection.Listener listener) { super.addListener(listener); return this.listeners.add(listener); } - + public boolean removeListener(LogicalConnection.Listener listener) { super.removeListener(listener); return this.listeners.remove(listener); } - + /** * Get the list of extensions in use. *

@@ -469,7 +480,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp getEndPoint().setIdleTimeout(ms); } } - + @Override public SuspendToken suspend() { @@ -499,7 +510,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp generator, parser); } - + @Override public int hashCode() { @@ -549,7 +560,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { LOG.debug("onUpgradeTo({})", BufferUtil.toDetailString(prefilled)); } - + setInitialBuffer(prefilled); } } diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/AbstractTrackingEndpoint.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/AbstractTrackingEndpoint.java index 2c6cbb3c777..601757fd277 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/AbstractTrackingEndpoint.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/AbstractTrackingEndpoint.java @@ -41,6 +41,7 @@ public abstract class AbstractTrackingEndpoint public CountDownLatch openLatch = new CountDownLatch(1); public CountDownLatch closeLatch = new CountDownLatch(1); + public CountDownLatch errorLatch = new CountDownLatch(1); public AtomicReference closeInfo = new AtomicReference<>(); public AtomicReference error = new AtomicReference<>(); @@ -88,6 +89,11 @@ public abstract class AbstractTrackingEndpoint { assertTrue(prefix + " onOpen event", openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)); } + + public void awaitErrorEvent(String prefix) throws InterruptedException + { + assertTrue(prefix + " onError event", errorLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)); + } protected void onWSOpen(T session) { @@ -124,5 +130,6 @@ public abstract class AbstractTrackingEndpoint LOG.warn("onError should only happen once - Extra/Excess Cause", cause); fail("onError should only happen once!"); } + this.errorLatch.countDown(); } } diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientEarlyCloseTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientEarlyCloseTest.java new file mode 100644 index 00000000000..fcdccd92e52 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientEarlyCloseTest.java @@ -0,0 +1,291 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import static org.hamcrest.CoreMatchers.anything; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.client.HttpResponseException; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.WebSocketTimeoutException; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.Defaults; +import org.eclipse.jetty.websocket.tests.SimpleServletServer; +import org.eclipse.jetty.websocket.tests.TrackingEndpoint; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; + +/** + * Tests various early drop/close scenarios + */ +public class ClientEarlyCloseTest +{ + /** + * On Open, close socket + */ + @WebSocket + public static class OpenDropSocket + { + private static final Logger LOG = Log.getLogger(OpenDropSocket.class); + + @OnWebSocketConnect + public void onOpen(Session sess) + { + LOG.debug("onOpen({})", sess); + try + { + sess.disconnect(); + } + catch (IOException ignore) + { + } + } + } + + /** + * On Open, throw unhandled exception + */ + @WebSocket + public static class OpenFailSocket + { + private static final Logger LOG = Log.getLogger(OpenFailSocket.class); + + @OnWebSocketConnect + public void onOpen(Session sess) + { + LOG.debug("onOpen({})", sess); + // Test failure due to unhandled exception + // this should trigger a fast-fail closure during open/connect + throw new RuntimeException("Intentional FastFail"); + } + } + + /** + * On Message, drop connection + */ + public static class MessageDropSocket extends WebSocketAdapter + { + private static final Logger LOG = Log.getLogger(MessageDropSocket.class); + + @Override + public void onWebSocketText(String message) + { + LOG.debug("onWebSocketText({})", message); + try + { + getSession().disconnect(); + } + catch (IOException ignore) + { + } + } + } + + public static class EarlyCloseServlet extends WebSocketServlet implements WebSocketCreator + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setCreator(this); + } + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + if (req.hasSubProtocol("opendrop")) + { + resp.setAcceptedSubProtocol("opendrop"); + return new OpenDropSocket(); + } + + if (req.hasSubProtocol("openfail")) + { + resp.setAcceptedSubProtocol("openfail"); + return new OpenFailSocket(); + } + + if (req.hasSubProtocol("msgdrop")) + { + resp.setAcceptedSubProtocol("msgdrop"); + return new MessageDropSocket(); + } + + return null; + } + } + + @Rule + public TestName testname = new TestName(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private SimpleServletServer server; + private WebSocketClient client; + + @Before + public void startServer() throws Exception + { + server = new SimpleServletServer(new EarlyCloseServlet()); + server.start(); + } + + @After + public void stopServer() throws Exception + { + server.stop(); + } + + @Before + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.start(); + } + + @After + public void stopClient() throws Exception + { + client.stop(); + } + + /** + * The remote endpoint sends a close frame immediately. + * + * @throws Exception on test failure + */ + @Test + public void immediateDrop() throws Exception + { + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols("openclose"); + + TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName()); + + URI wsUri = server.getServerUri().resolve("/"); + Future clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest); + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(instanceOf(HttpResponseException.class)); + expectedException.expectMessage(containsString("503 Endpoint Creation Failed")); + clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + + /** + * The remote endpoint performed upgrade handshake ok, but failed its onOpen. + * + * @throws Exception on test failure + */ + @Test + public void remoteOpenFailure() throws Exception + { + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols("openfail"); + + TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName()); + + URI wsUri = server.getServerUri().resolve("/"); + Future clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest); + + Session session = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + try + { + clientSocket.openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat("OnOpen.UpgradeRequest", clientSocket.openUpgradeRequest, notNullValue()); + assertThat("OnOpen.UpgradeResponse", clientSocket.openUpgradeResponse, notNullValue()); + assertThat("Negotiated SubProtocol", clientSocket.openUpgradeResponse.getAcceptedSubProtocol(), is("openfail")); + + clientSocket.awaitCloseEvent("Client"); + clientSocket.assertCloseInfo("Client", StatusCode.SERVER_ERROR, anything()); + } + finally + { + session.close(); + } + } + + /** + * The connection has performed handshake successfully. + *

+ * Send of message to remote results in dropped connection on server side. + *

+ * + * @throws Exception on test failure + */ + @Test + public void messageDrop() throws Exception + { + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols("msgdrop"); + + TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName()); + + URI wsUri = server.getServerUri().resolve("/"); + client.setMaxIdleTimeout(3000); + Future clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest); + + Session session = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + try + { + clientSocket.openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertThat("OnOpen.UpgradeRequest", clientSocket.openUpgradeRequest, notNullValue()); + assertThat("OnOpen.UpgradeResponse", clientSocket.openUpgradeResponse, notNullValue()); + assertThat("Negotiated SubProtocol", clientSocket.openUpgradeResponse.getAcceptedSubProtocol(), is("msgdrop")); + + session.getRemote().sendString("drop-me"); + + clientSocket.awaitErrorEvent("Client"); + clientSocket.assertErrorEvent("Client", instanceOf(WebSocketTimeoutException.class), containsString("Connection Idle Timeout")); + } + finally + { + session.close(); + } + } +} diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/WebSocketCloseTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/WebSocketCloseTest.java deleted file mode 100644 index b57cc1ca2a3..00000000000 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/WebSocketCloseTest.java +++ /dev/null @@ -1,328 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.tests.server; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertThat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.log.StacklessLogging; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.api.WebSocketConstants; -import org.eclipse.jetty.websocket.common.CloseInfo; -import org.eclipse.jetty.websocket.common.OpCode; -import org.eclipse.jetty.websocket.common.WebSocketFrame; -import org.eclipse.jetty.websocket.common.WebSocketSession; -import org.eclipse.jetty.websocket.common.frames.TextFrame; -import org.eclipse.jetty.websocket.server.WebSocketServerFactory; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; -import org.eclipse.jetty.websocket.servlet.WebSocketCreator; -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; -import org.eclipse.jetty.websocket.tests.LocalFuzzer; -import org.eclipse.jetty.websocket.tests.SimpleServletServer; -import org.eclipse.jetty.websocket.tests.UpgradeUtils; -import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * Tests various close scenarios - */ -@Ignore("Need to fix") -public class WebSocketCloseTest -{ - /** - * On Message, return container information - */ - public static class ContainerSocket extends WebSocketAdapter - { - private static final Logger LOG = Log.getLogger(WebSocketCloseTest.ContainerSocket.class); - private final WebSocketServerFactory container; - private Session session; - - public ContainerSocket(WebSocketServerFactory container) - { - this.container = container; - } - - @Override - public void onWebSocketText(String message) - { - LOG.debug("onWebSocketText({})", message); - if (message.equalsIgnoreCase("openSessions")) - { - try - { - Collection sessions = container.getOpenSessions(); - - StringBuilder ret = new StringBuilder(); - ret.append("openSessions.size=").append(sessions.size()).append('\n'); - int idx = 0; - for (WebSocketSession sess : sessions) - { - ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n'); - } - session.getRemote().sendString(ret.toString()); - } - catch (IOException e) - { - LOG.warn(e); - } - } - session.close(StatusCode.NORMAL, "ContainerSocket"); - } - - @Override - public void onWebSocketConnect(Session sess) - { - LOG.debug("onWebSocketConnect({})", sess); - this.session = sess; - } - } - - /** - * On Connect, close socket - */ - public static class FastCloseSocket extends WebSocketAdapter - { - private static final Logger LOG = Log.getLogger(WebSocketCloseTest.FastCloseSocket.class); - - @Override - public void onWebSocketConnect(Session sess) - { - LOG.debug("onWebSocketConnect({})", sess); - sess.close(StatusCode.NORMAL, "FastCloseServer"); - } - } - - /** - * On Connect, throw unhandled exception - */ - public static class FastFailSocket extends WebSocketAdapter - { - private static final Logger LOG = Log.getLogger(WebSocketCloseTest.FastFailSocket.class); - - @Override - public void onWebSocketConnect(Session sess) - { - LOG.debug("onWebSocketConnect({})", sess); - // Test failure due to unhandled exception - // this should trigger a fast-fail closure during open/connect - throw new RuntimeException("Intentional FastFail"); - } - } - - /** - * On Message, drop connection - */ - public static class DropServerConnectionSocket extends WebSocketAdapter - { - @Override - public void onWebSocketText(String message) - { - try - { - getSession().disconnect(); - } - catch (IOException ignore) - { - } - } - } - - public static class CloseServlet extends WebSocketServlet implements WebSocketCreator - { - private WebSocketServerFactory serverFactory; - - @Override - public void configure(WebSocketServletFactory factory) - { - factory.setCreator(this); - if (factory instanceof WebSocketServerFactory) - { - this.serverFactory = (WebSocketServerFactory) factory; - } - } - - @Override - public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) - { - if (req.hasSubProtocol("fastclose")) - { - return new FastCloseSocket(); - } - - if (req.hasSubProtocol("fastfail")) - { - return new FastFailSocket(); - } - - if (req.hasSubProtocol("drop")) - { - return new DropServerConnectionSocket(); - } - - if (req.hasSubProtocol("container")) - { - return new ContainerSocket(serverFactory); - } - - return new RFC6455Socket(); - } - } - - private SimpleServletServer server; - - @Before - public void startServer() throws Exception - { - server = new SimpleServletServer(new CloseServlet()); - server.start(); - } - - @After - public void stopServer() throws Exception - { - server.stop(); - } - - /** - * Test fast close (bug #403817) - * - * @throws Exception on test failure - */ - @Test - public void fastClose() throws Exception - { - Map upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders(); - upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "fastclose"); - - List expect = new ArrayList<>(); - expect.add(new CloseInfo(StatusCode.NORMAL, "FastCloseServer").asFrame()); - - try (LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders)) - { - session.sendFrames(new CloseInfo(StatusCode.NORMAL).asFrame()); - session.expect(expect); - } - } - - /** - * Test fast fail (bug #410537) - * - * @throws Exception on test failure - */ - @Test - public void fastFail() throws Exception - { - Map upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders(); - upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "fastfail"); - - List expect = new ArrayList<>(); - expect.add(new CloseInfo(StatusCode.SERVER_ERROR).asFrame()); - - try (StacklessLogging ignore = new StacklessLogging(FastFailSocket.class); - LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders)) - { - session.expect(expect); - } - } - - @Test - public void dropServerConnection() throws Exception - { - Map upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders(); - upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "drop"); - - try (LocalFuzzer session = server.newLocalFuzzer("/", upgradeHeaders)) - { - session.sendFrames(new TextFrame().setPayload("drop")); - BlockingQueue framesQueue = session.getOutputFrames(); - assertThat("No frames as output", framesQueue.size(), Matchers.is(0)); - } - } - - /** - * - * @throws Exception on test failure - */ - @Test - public void testFastFailFastClose() throws Exception - { - fastFail(); - fastClose(); - } - - - /** - * Test session open session cleanup (bug #474936) - * - * @throws Exception on test failure - */ - @Test - public void testOpenSessionCleanup() throws Exception - { - fastFail(); - fastClose(); - dropClientConnection(); - - Map upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders(); - upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "container"); - - try (LocalFuzzer session = server.newLocalFuzzer("/?openSessions", upgradeHeaders)) - { - session.sendFrames( - new TextFrame().setPayload("openSessions"), - new CloseInfo(StatusCode.NORMAL).asFrame() - ); - - BlockingQueue framesQueue = session.getOutputFrames(); - WebSocketFrame frame = framesQueue.poll(3, TimeUnit.SECONDS); - assertThat("Frame.opCode", frame.getOpCode(), is(OpCode.TEXT)); - assertThat("Frame.text-payload", frame.getPayloadAsUTF8(), containsString("openSessions.size=1\n")); - } - } - - private void dropClientConnection() throws Exception - { - Map upgradeHeaders = UpgradeUtils.newDefaultUpgradeRequestHeaders(); - upgradeHeaders.put(WebSocketConstants.SEC_WEBSOCKET_PROTOCOL, "container"); - - try (LocalFuzzer ignored = server.newLocalFuzzer("/", upgradeHeaders)) - { - // do nothing, just let endpoint close - } - } -} diff --git a/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties index 645e626e47e..1c7cf791ed8 100644 --- a/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties @@ -26,6 +26,8 @@ org.eclipse.jetty.LEVEL=WARN # org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG # org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG # org.eclipse.jetty.client.LEVEL=DEBUG +# org.eclipse.jetty.io.LEVEL=DEBUG +# org.eclipse.jetty.io.ManagedSelector.LEVEL=INFO # org.eclipse.jetty.websocket.LEVEL=DEBUG # org.eclipse.jetty.websocket.LEVEL=INFO # org.eclipse.jetty.websocket.jsr356.messages.LEVEL=DEBUG