diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index fa4b1127c6c..8abdfff1d69 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -100,7 +100,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc @Override public void close() throws IOException { - connection.close(); + this.close(StatusCode.NORMAL,null); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index b912abc5eef..f3dffcbb2c4 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -104,7 +104,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void succeeded() { - // Lets process the next set of bytes... AbstractWebSocketConnection.this.complete(writeBytes); } } @@ -288,11 +287,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } CloseInfo close = new CloseInfo(statusCode,reason); - // TODO: create DisconnectCallback? outgoingFrame(close.asFrame(),new OnCloseCallback()); } - private void execute(Runnable task) + protected void execute(Runnable task) { try { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java index 037a6d18283..e34e31b695d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java @@ -165,10 +165,13 @@ public class WriteBytesProvider implements Callback { synchronized (this) { + boolean notified = false; + // fail active (if set) if (active != null) { active.notifyFailure(t); + notified = true; } failure = t; @@ -177,12 +180,16 @@ public class WriteBytesProvider implements Callback for (FrameEntry fe : queue) { fe.notifyFailure(t); + notified = true; } queue.clear(); - // notify flush callback - flushCallback.failed(t); + if (notified) + { + // notify flush callback + flushCallback.failed(t); + } } } diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java index afe2e78a632..0a78c7d596c 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java @@ -68,11 +68,18 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection public void onOpen() { boolean beenOpened = opened.getAndSet(true); + super.onOpen(); if (!beenOpened) { - factory.sessionOpened(getSession()); + execute(new Runnable() + { + @Override + public void run() + { + factory.sessionOpened(getSession()); + } + }); } - super.onOpen(); } @Override diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java new file mode 100644 index 00000000000..cb2770d24d6 --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java @@ -0,0 +1,161 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.server; + +import static org.hamcrest.Matchers.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.UpgradeRequest; +import org.eclipse.jetty.websocket.api.UpgradeResponse; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; +import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture; +import org.eclipse.jetty.websocket.server.helper.RFCSocket; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests various close scenarios + */ +public class WebSocketCloseTest +{ + @SuppressWarnings("serial") + public static class CloseServlet extends WebSocketServlet implements WebSocketCreator + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setCreator(this); + } + + @Override + public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) + { + if (req.hasSubProtocol("fastclose")) + { + fastcloseSocket = new FastCloseSocket(); + return fastcloseSocket; + } + return new RFCSocket(); + } + } + + public static class FastCloseSocket extends WebSocketAdapter + { + public CountDownLatch closeLatch = new CountDownLatch(1); + public String closeReason = null; + public int closeStatusCode = -1; + public List errors = new ArrayList<>(); + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + LOG.debug("onWebSocketClose({}, {})",statusCode,reason); + this.closeStatusCode = statusCode; + this.closeReason = reason; + closeLatch.countDown(); + } + + @Override + public void onWebSocketConnect(Session sess) + { + LOG.debug("onWebSocketConnect({})",sess); + try + { + sess.close(); + } + catch (IOException e) + { + e.printStackTrace(System.err); + } + } + + @Override + public void onWebSocketError(Throwable cause) + { + errors.add(cause); + } + } + + private static final Logger LOG = Log.getLogger(WebSocketCloseTest.class); + private static SimpleServletServer server; + private static FastCloseSocket fastcloseSocket; + + @BeforeClass + public static void startServer() throws Exception + { + server = new SimpleServletServer(new CloseServlet()); + server.start(); + } + + @AfterClass + public static void stopServer() + { + server.stop(); + } + + /** + * Test fast close (bug #403817) + */ + @Test + public void testFastClose() throws Exception + { + BlockheadClient client = new BlockheadClient(server.getServerUri()); + client.setProtocols("fastclose"); + client.setTimeout(TimeUnit.SECONDS,1); + try + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1); + WebSocketFrame frame = capture.getFrames().get(0); + Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL)); + + client.write(close.asFrame()); // respond with close + + Assert.assertThat("Fast Close Latch",fastcloseSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true)); + Assert.assertThat("Fast Close.statusCode",fastcloseSocket.closeStatusCode,is(StatusCode.NORMAL)); + } + finally + { + client.close(); + } + } +}