diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ManyConnectionsCleanupTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ManyConnectionsCleanupTest.java new file mode 100644 index 00000000000..0adf4a4b4e8 --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ManyConnectionsCleanupTest.java @@ -0,0 +1,366 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.server; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.toolchain.test.EventQueue; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.util.log.StdErrLog; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.common.frames.TextFrame; +import org.eclipse.jetty.websocket.common.test.BlockheadClient; +import org.eclipse.jetty.websocket.server.helper.RFCSocket; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests various close scenarios that should result in Open Session cleanup + */ +public class ManyConnectionsCleanupTest +{ + static class AbstractCloseSocket extends WebSocketAdapter + { + public CountDownLatch closeLatch = new CountDownLatch(1); + public String closeReason = null; + public int closeStatusCode = -1; + public List errors = new ArrayList<>(); + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + LOG.debug("onWebSocketClose({}, {})",statusCode,reason); + this.closeStatusCode = statusCode; + this.closeReason = reason; + closeLatch.countDown(); + } + + @Override + public void onWebSocketError(Throwable cause) + { + errors.add(cause); + } + } + + @SuppressWarnings("serial") + public static class CloseServlet extends WebSocketServlet implements WebSocketCreator + { + private WebSocketServerFactory serverFactory; + private AtomicInteger calls = new AtomicInteger(0); + + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setCreator(this); + if (factory instanceof WebSocketServerFactory) + { + this.serverFactory = (WebSocketServerFactory)factory; + } + } + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + if (req.hasSubProtocol("fastclose")) + { + closeSocket = new FastCloseSocket(calls); + return closeSocket; + } + + if (req.hasSubProtocol("fastfail")) + { + closeSocket = new FastFailSocket(calls); + return closeSocket; + } + + if (req.hasSubProtocol("container")) + { + closeSocket = new ContainerSocket(serverFactory,calls); + return closeSocket; + } + return new RFCSocket(); + } + } + + /** + * On Message, return container information + */ + public static class ContainerSocket extends AbstractCloseSocket + { + private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.ContainerSocket.class); + private final WebSocketServerFactory container; + private final AtomicInteger calls; + private Session session; + + public ContainerSocket(WebSocketServerFactory container, AtomicInteger calls) + { + this.container = container; + this.calls = calls; + } + + @Override + public void onWebSocketText(String message) + { + LOG.debug("onWebSocketText({})",message); + calls.incrementAndGet(); + if (message.equalsIgnoreCase("openSessions")) + { + Set sessions = container.getOpenSessions(); + + StringBuilder ret = new StringBuilder(); + ret.append("openSessions.size=").append(sessions.size()).append('\n'); + int idx = 0; + for (WebSocketSession sess : sessions) + { + ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n'); + } + session.getRemote().sendStringByFuture(ret.toString()); + session.close(StatusCode.NORMAL,"ContainerSocket"); + } else if(message.equalsIgnoreCase("calls")) + { + session.getRemote().sendStringByFuture(String.format("calls=%,d",calls.get())); + } + } + + @Override + public void onWebSocketConnect(Session sess) + { + LOG.debug("onWebSocketConnect({})",sess); + this.session = sess; + } + } + + /** + * On Connect, close socket + */ + public static class FastCloseSocket extends AbstractCloseSocket + { + private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.FastCloseSocket.class); + private final AtomicInteger calls; + + public FastCloseSocket(AtomicInteger calls) + { + this.calls = calls; + } + + @Override + public void onWebSocketConnect(Session sess) + { + LOG.debug("onWebSocketConnect({})",sess); + calls.incrementAndGet(); + sess.close(StatusCode.NORMAL,"FastCloseServer"); + } + } + + /** + * On Connect, throw unhandled exception + */ + public static class FastFailSocket extends AbstractCloseSocket + { + private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.FastFailSocket.class); + private final AtomicInteger calls; + + public FastFailSocket(AtomicInteger calls) + { + this.calls = calls; + } + + @Override + public void onWebSocketConnect(Session sess) + { + LOG.debug("onWebSocketConnect({})",sess); + calls.incrementAndGet(); + // Test failure due to unhandled exception + // this should trigger a fast-fail closure during open/connect + throw new RuntimeException("Intentional FastFail"); + } + } + + private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.class); + + private static SimpleServletServer server; + private static AbstractCloseSocket closeSocket; + + @BeforeClass + public static void startServer() throws Exception + { + server = new SimpleServletServer(new CloseServlet()); + server.start(); + } + + @AfterClass + public static void stopServer() + { + server.stop(); + } + + /** + * Test session open session cleanup (bug #474936) + * + * @throws Exception + * on test failure + */ + @Test + public void testOpenSessionCleanup() throws Exception + { + int iterationCount = 100; + + StdErrLog.getLogger(FastFailSocket.class).setLevel(StdErrLog.LEVEL_OFF); + + StdErrLog sessLog = StdErrLog.getLogger(WebSocketSession.class); + int oldLevel = sessLog.getLevel(); + sessLog.setLevel(StdErrLog.LEVEL_OFF); + + for (int requests = 0; requests < iterationCount; requests++) + { + fastFail(); + fastClose(); + dropConnection(); + } + + sessLog.setLevel(oldLevel); + + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) + { + client.setProtocols("container"); + client.setTimeout(1,TimeUnit.SECONDS); + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + client.write(new TextFrame().setPayload("calls")); + client.write(new TextFrame().setPayload("openSessions")); + + EventQueue frames = client.readFrames(3,6,TimeUnit.SECONDS); + WebSocketFrame frame; + String resp; + + frame = frames.poll(); + assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.TEXT)); + resp = frame.getPayloadAsUTF8(); + assertThat("Should only have 1 open session",resp,containsString("calls=" + ((iterationCount * 2) + 1))); + + frame = frames.poll(); + assertThat("frames[1].opcode",frame.getOpCode(),is(OpCode.TEXT)); + resp = frame.getPayloadAsUTF8(); + assertThat("Should only have 1 open session",resp,containsString("openSessions.size=1\n")); + + frame = frames.poll(); + assertThat("frames[2].opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL)); + client.write(close.asFrame()); // respond with close + + // ensure server socket got close event + assertThat("Open Sessions Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true)); + assertThat("Open Sessions.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL)); + assertThat("Open Sessions.errors",closeSocket.errors.size(),is(0)); + } + } + + private void fastClose() throws Exception + { + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) + { + client.setProtocols("fastclose"); + client.setTimeout(1,TimeUnit.SECONDS); + try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class)) + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + client.readFrames(1,1,TimeUnit.SECONDS); + + CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal"); + assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL)); + + // Notify server of close handshake + client.write(close.asFrame()); // respond with close + + // ensure server socket got close event + assertThat("Fast Close Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true)); + assertThat("Fast Close.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL)); + } + } + } + + private void fastFail() throws Exception + { + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) + { + client.setProtocols("fastfail"); + client.setTimeout(1,TimeUnit.SECONDS); + try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class)) + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + client.readFrames(1,1,TimeUnit.SECONDS); + + CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal"); + client.write(close.asFrame()); // respond with close + + // ensure server socket got close event + assertThat("Fast Fail Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true)); + assertThat("Fast Fail.statusCode",closeSocket.closeStatusCode,is(StatusCode.SERVER_ERROR)); + assertThat("Fast Fail.errors",closeSocket.errors.size(),is(1)); + } + } + } + + private void dropConnection() throws Exception + { + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) + { + client.setProtocols("container"); + client.setTimeout(1,TimeUnit.SECONDS); + try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class)) + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + client.disconnect(); + } + } + } +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java index 392dba798ad..940829def67 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java @@ -50,7 +50,6 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.Ignore; /** * Tests various close scenarios @@ -218,7 +217,6 @@ public class WebSocketCloseTest * on test failure */ @Test - @Ignore("RELEASE") public void testFastClose() throws Exception { try (BlockheadClient client = new BlockheadClient(server.getServerUri())) @@ -252,7 +250,6 @@ public class WebSocketCloseTest * on test failure */ @Test - @Ignore("RELEASE") public void testFastFail() throws Exception { try (BlockheadClient client = new BlockheadClient(server.getServerUri()))