From 900dea37195735fd6e43442c3945ac746e2a1776 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 22 Apr 2014 16:07:45 -0700 Subject: [PATCH] 433262 - WebSocket / Advanced close use cases + ClientCloseTest implementation of various outlined use cases. --- .../endpoints/AbstractJsrEventDriver.java | 3 +- .../endpoints/JsrAnnotatedEventDriver.java | 4 +- .../endpoints/JsrEndpointEventDriver.java | 4 +- .../client/io/ConnectionManager.java | 11 +- .../io/WebSocketClientSelectorManager.java | 7 +- .../websocket/client/ClientCloseTest.java | 626 ++++++++++++++++++ .../websocket/client/ServerWriteThread.java | 18 - .../websocket/client/SlowServerTest.java | 33 +- .../jetty/websocket/client/TimeoutTest.java | 115 ---- .../test/resources/jetty-logging.properties | 9 +- .../jetty/websocket/common/CloseInfo.java | 26 +- .../jetty/websocket/common/Parser.java | 31 +- .../websocket/common/WebSocketSession.java | 66 +- .../common/events/AbstractEventDriver.java | 11 +- .../io/AbstractWebSocketConnection.java | 75 ++- .../websocket/common/io/FrameFlusher.java | 535 ++++++++------- .../jetty/websocket/common/io/IOState.java | 78 ++- .../websocket/common/util/ReflectUtils.java | 2 +- .../jetty/websocket/common/CloseInfoTest.java | 166 +++++ .../websocket/common/ab/TestABCase4.java | 58 +- .../common/test/BlockheadClient.java | 49 +- .../common/test/BlockheadServer.java | 38 +- .../server/WebSocketServerFactory.java | 19 +- .../websocket/server/WebSocketCloseTest.java | 40 +- 24 files changed, 1436 insertions(+), 588 deletions(-) create mode 100644 jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java delete mode 100644 jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java create mode 100644 jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java index 3b91b387a73..d7b9ced7f14 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/AbstractJsrEventDriver.java @@ -31,11 +31,10 @@ import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.events.AbstractEventDriver; -import org.eclipse.jetty.websocket.common.events.EventDriver; import org.eclipse.jetty.websocket.jsr356.JsrSession; import org.eclipse.jetty.websocket.jsr356.metadata.EndpointMetadata; -public abstract class AbstractJsrEventDriver extends AbstractEventDriver implements EventDriver +public abstract class AbstractJsrEventDriver extends AbstractEventDriver { protected final EndpointMetadata metadata; protected final EndpointConfig config; diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java index a550c20b8b4..0cf9df7ab51 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrAnnotatedEventDriver.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.Reader; import java.nio.ByteBuffer; import java.util.Map; + import javax.websocket.CloseReason; import javax.websocket.DecodeException; @@ -31,7 +32,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.common.events.EventDriver; import org.eclipse.jetty.websocket.common.message.MessageInputStream; import org.eclipse.jetty.websocket.common.message.MessageReader; import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage; @@ -44,7 +44,7 @@ import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage; /** * Base implementation for JSR-356 Annotated event drivers. */ -public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver +public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver { private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class); private final JsrEvents events; diff --git a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java index b977147fb91..9ac49062ace 100644 --- a/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java +++ b/jetty-websocket/javax-websocket-client-impl/src/main/java/org/eclipse/jetty/websocket/jsr356/endpoints/JsrEndpointEventDriver.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.Reader; import java.nio.ByteBuffer; import java.util.Map; + import javax.websocket.CloseReason; import javax.websocket.Endpoint; import javax.websocket.MessageHandler; @@ -34,7 +35,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.common.events.EventDriver; import org.eclipse.jetty.websocket.common.message.MessageInputStream; import org.eclipse.jetty.websocket.common.message.MessageReader; import org.eclipse.jetty.websocket.jsr356.JsrPongMessage; @@ -49,7 +49,7 @@ import org.eclipse.jetty.websocket.jsr356.messages.TextWholeMessage; /** * EventDriver for websocket that extend from {@link javax.websocket.Endpoint} */ -public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements EventDriver +public class JsrEndpointEventDriver extends AbstractJsrEventDriver { private static final Logger LOG = Log.getLogger(JsrEndpointEventDriver.class); diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java index da112f9cb1e..ab20d77d9de 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/ConnectionManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; @@ -158,7 +159,7 @@ public class ConnectionManager extends ContainerLifeCycle sessions.add(session); } - private void closeAllConnections() + private void shutdownAllConnections() { for (WebSocketSession session : sessions) { @@ -166,11 +167,13 @@ public class ConnectionManager extends ContainerLifeCycle { try { - session.getConnection().close(); + session.getConnection().close( + StatusCode.SHUTDOWN, + "Shutdown"); } catch (Throwable t) { - LOG.debug("During Close All Connections",t); + LOG.debug("During Shutdown All Connections",t); } } } @@ -203,7 +206,7 @@ public class ConnectionManager extends ContainerLifeCycle @Override protected void doStop() throws Exception { - closeAllConnections(); + shutdownAllConnections(); sessions.clear(); super.doStop(); removeBean(selector); diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java index 928e911e278..6616f54f996 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java @@ -98,7 +98,7 @@ public class WebSocketClientSelectorManager extends SelectorManager else { // Standard "ws://" - endPoint.setIdleTimeout(connectPromise.getClient().getMaxIdleTimeout()); + endPoint.setIdleTimeout(connectPromise.getDriver().getPolicy().getIdleTimeout()); return newUpgradeConnection(channel,endPoint,connectPromise); } } @@ -139,4 +139,9 @@ public class WebSocketClientSelectorManager extends SelectorManager { this.sslContextFactory = sslContextFactory; } + + public WebSocketPolicy getPolicy() + { + return policy; + } } diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java new file mode 100644 index 00000000000..3a7f33874bd --- /dev/null +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java @@ -0,0 +1,626 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.client; + +import static org.hamcrest.Matchers.*; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.io.SelectorManager.ManagedSelector; +import org.eclipse.jetty.toolchain.test.EventQueue; +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.websocket.api.ProtocolException; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.client.io.ConnectionManager; +import org.eclipse.jetty.websocket.client.io.WebSocketClientSelectorManager; +import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.common.frames.TextFrame; +import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; +import org.eclipse.jetty.websocket.common.test.BlockheadServer; +import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection; +import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture; +import org.eclipse.jetty.websocket.common.test.RawFrameBuilder; +import org.hamcrest.Matcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class ClientCloseTest +{ + private static final Logger LOG = Log.getLogger(ClientCloseTest.class); + + private static class CloseTrackingSocket extends WebSocketAdapter + { + private static final Logger LOG = Log.getLogger(ClientCloseTest.CloseTrackingSocket.class); + + public int closeCode = -1; + public String closeReason = null; + public CountDownLatch closeLatch = new CountDownLatch(1); + public CountDownLatch openLatch = new CountDownLatch(1); + + public EventQueue messageQueue = new EventQueue<>(); + public EventQueue errorQueue = new EventQueue<>(); + + public void assertNoCloseEvent() + { + Assert.assertThat("Client Close Event",closeLatch.getCount(),is(1L)); + Assert.assertThat("Client Close Event Status Code ",closeCode,is(-1)); + } + + public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher statusCodeMatcher, Matcher reasonMatcher) + throws InterruptedException + { + long maxTimeout = clientTimeoutMs * 2; + + Assert.assertThat("Client Close Event Occurred",closeLatch.await(maxTimeout,TimeUnit.MILLISECONDS),is(true)); + Assert.assertThat("Client Close Event Status Code",closeCode,statusCodeMatcher); + if (reasonMatcher == null) + { + Assert.assertThat("Client Close Event Reason",closeReason,nullValue()); + } + else + { + Assert.assertThat("Client Close Event Reason",closeReason,reasonMatcher); + } + } + + public void assertReceivedError(Class expectedThrownClass, Matcher messageMatcher) throws TimeoutException, + InterruptedException + { + errorQueue.awaitEventCount(1,500,TimeUnit.MILLISECONDS); + Throwable actual = errorQueue.poll(); + Assert.assertThat("Client Error Event",actual,instanceOf(expectedThrownClass)); + if (messageMatcher == null) + { + Assert.assertThat("Client Error Event Message",actual.getMessage(),nullValue()); + } + else + { + Assert.assertThat("Client Error Event Message",actual.getMessage(),messageMatcher); + } + } + + public void clearQueues() + { + messageQueue.clear(); + errorQueue.clear(); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + LOG.debug("onWebSocketClose({},{})",statusCode,reason); + super.onWebSocketClose(statusCode,reason); + closeCode = statusCode; + closeReason = reason; + closeLatch.countDown(); + } + + @Override + public void onWebSocketConnect(Session session) + { + super.onWebSocketConnect(session); + openLatch.countDown(); + } + + @Override + public void onWebSocketError(Throwable cause) + { + LOG.debug("onWebSocketError",cause); + Assert.assertThat("Error capture",errorQueue.offer(cause),is(true)); + } + + @Override + public void onWebSocketText(String message) + { + LOG.debug("onWebSocketText({})",message); + messageQueue.offer(message); + } + + public EndPoint getEndPoint() throws Exception + { + Session session = getSession(); + Assert.assertThat("Session type",session,instanceOf(WebSocketSession.class)); + + WebSocketSession wssession = (WebSocketSession)session; + Field fld = wssession.getClass().getDeclaredField("connection"); + fld.setAccessible(true); + Assert.assertThat("Field: connection",fld,notNullValue()); + + Object val = fld.get(wssession); + Assert.assertThat("Connection type",val,instanceOf(AbstractWebSocketConnection.class)); + @SuppressWarnings("resource") + AbstractWebSocketConnection wsconn = (AbstractWebSocketConnection)val; + return wsconn.getEndPoint(); + } + } + + @Rule + public TestTracker tt = new TestTracker(); + + private BlockheadServer server; + private WebSocketClient client; + + private void confirmConnection(CloseTrackingSocket clientSocket, Future clientFuture, ServerConnection serverConn) throws Exception + { + // Wait for client connect on via future + clientFuture.get(500,TimeUnit.MILLISECONDS); + + // Wait for client connect via client websocket + Assert.assertThat("Client WebSocket is Open",clientSocket.openLatch.await(500,TimeUnit.MILLISECONDS),is(true)); + + try + { + // Send message from client to server + final String echoMsg = "echo-test"; + Future testFut = clientSocket.getRemote().sendStringByFuture(echoMsg); + + // Wait for send future + testFut.get(500,TimeUnit.MILLISECONDS); + + // Read Frame on server side + IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS); + serverCapture.assertNoErrors(); + serverCapture.assertFrameCount(1); + WebSocketFrame frame = serverCapture.getFrames().poll(); + Assert.assertThat("Server received frame",frame.getOpCode(),is(OpCode.TEXT)); + Assert.assertThat("Server received frame payload",frame.getPayloadAsUTF8(),is(echoMsg)); + + // Server send echo reply + serverConn.write(new TextFrame().setPayload(echoMsg)); + + // Wait for received echo + clientSocket.messageQueue.awaitEventCount(1,1,TimeUnit.SECONDS); + + // Verify received message + String recvMsg = clientSocket.messageQueue.poll(); + Assert.assertThat("Received message",recvMsg,is(echoMsg)); + + // Verify that there are no errors + Assert.assertThat("Error events",clientSocket.errorQueue,empty()); + } + finally + { + clientSocket.clearQueues(); + } + } + + private void confirmServerReceivedCloseFrame(ServerConnection serverConn, int expectedCloseCode, Matcher closeReasonMatcher) throws IOException, + TimeoutException + { + IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS); + serverCapture.assertNoErrors(); + serverCapture.assertFrameCount(1); + serverCapture.assertHasFrame(OpCode.CLOSE,1); + WebSocketFrame frame = serverCapture.getFrames().poll(); + Assert.assertThat("Server received close frame",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo closeInfo = new CloseInfo(frame); + Assert.assertThat("Server received close code",closeInfo.getStatusCode(),is(expectedCloseCode)); + if (closeReasonMatcher == null) + { + Assert.assertThat("Server received close reason",closeInfo.getReason(),nullValue()); + } + else + { + Assert.assertThat("Server received close reason",closeInfo.getReason(),closeReasonMatcher); + } + } + + public static class TestWebSocketClient extends WebSocketClient + { + @Override + protected ConnectionManager newConnectionManager() + { + return new TestConnectionManager(this); + } + } + + public static class TestConnectionManager extends ConnectionManager + { + public TestConnectionManager(WebSocketClient client) + { + super(client); + } + + @Override + protected WebSocketClientSelectorManager newWebSocketClientSelectorManager(WebSocketClient client) + { + return new TestSelectorManager(client); + } + } + + public static class TestSelectorManager extends WebSocketClientSelectorManager + { + public TestSelectorManager(WebSocketClient client) + { + super(client); + } + + @Override + protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + { + return new TestEndPoint(channel,selectSet,selectionKey,getScheduler(),getPolicy().getIdleTimeout()); + } + } + + public static class TestEndPoint extends SelectChannelEndPoint + { + public AtomicBoolean congestedFlush = new AtomicBoolean(false); + + public TestEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) + { + super(channel,selector,key,scheduler,idleTimeout); + } + + @Override + public boolean flush(ByteBuffer... buffers) throws IOException + { + boolean flushed = super.flush(buffers); + congestedFlush.set(!flushed); + return flushed; + } + } + + @Before + public void startClient() throws Exception + { + client = new TestWebSocketClient(); + client.start(); + } + + @Before + public void startServer() throws Exception + { + server = new BlockheadServer(); + server.start(); + } + + @After + public void stopClient() throws Exception + { + if (client.isRunning()) + { + client.stop(); + } + } + + @After + public void stopServer() throws Exception + { + server.stop(); + } + + @Test + public void testHalfClose() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + // Client connects + CloseTrackingSocket clientSocket = new CloseTrackingSocket(); + Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); + + // Server accepts connect + ServerConnection serverConn = server.accept(); + serverConn.upgrade(); + + // client confirms connection via echo + confirmConnection(clientSocket,clientConnectFuture,serverConn); + + // client sends close frame (code 1000, normal) + final String origCloseReason = "Normal Close"; + clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); + + // server receives close frame + confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); + + // server sends 2 messages + serverConn.write(new TextFrame().setPayload("Hello")); + serverConn.write(new TextFrame().setPayload("World")); + + // server sends close frame (code 1000, no reason) + CloseInfo sclose = new CloseInfo(StatusCode.NORMAL,"From Server"); + serverConn.write(sclose.asFrame()); + + // client receives 2 messages + clientSocket.messageQueue.awaitEventCount(2,1,TimeUnit.SECONDS); + + // Verify received messages + String recvMsg = clientSocket.messageQueue.poll(); + Assert.assertThat("Received message 1",recvMsg,is("Hello")); + recvMsg = clientSocket.messageQueue.poll(); + Assert.assertThat("Received message 2",recvMsg,is("World")); + + // Verify that there are no errors + Assert.assertThat("Error events",clientSocket.errorQueue,empty()); + + // client close event on ws-endpoint + clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.NORMAL),containsString("From Server")); + } + + @Test + public void testNetworkCongestion() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + // Client connects + CloseTrackingSocket clientSocket = new CloseTrackingSocket(); + Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); + + // Server accepts connect + ServerConnection serverConn = server.accept(); + serverConn.upgrade(); + + // client confirms connection via echo + confirmConnection(clientSocket,clientConnectFuture,serverConn); + + // client sends BIG frames (until it cannot write anymore) + // server must not read (for test purpose, in order to congest connection) + // when write is congested, client enqueue close frame + // client initiate write, but write never completes + EndPoint endp = clientSocket.getEndPoint(); + Assert.assertThat("EndPoint is testable",endp,instanceOf(TestEndPoint.class)); + TestEndPoint testendp = (TestEndPoint)endp; + + char msg[] = new char[10240]; + int writeCount = 0; + long writeSize = 0; + int i = 0; + while (!testendp.congestedFlush.get()) + { + int z = i - ((i / 26) * 26); + char c = (char)('a' + z); + Arrays.fill(msg,c); + clientSocket.getRemote().sendStringByFuture(String.valueOf(msg)); + writeCount++; + writeSize += msg.length; + } + LOG.debug("Wrote {} frames totalling {} bytes of payload before congestion kicked in",writeCount,writeSize); + + // Verify that there are no errors + Assert.assertThat("Error events",clientSocket.errorQueue,empty()); + + // client idle timeout triggers close event on client ws-endpoint + // client close event on ws-endpoint + clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout")); + } + + @Test + public void testProtocolException() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + // Client connects + CloseTrackingSocket clientSocket = new CloseTrackingSocket(); + Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); + + // Server accepts connect + ServerConnection serverConn = server.accept(); + serverConn.upgrade(); + + // client confirms connection via echo + confirmConnection(clientSocket,clientConnectFuture,serverConn); + + // client should not have received close message (yet) + clientSocket.assertNoCloseEvent(); + + // server sends bad close frame (too big of a reason message) + byte msg[] = new byte[400]; + Arrays.fill(msg,(byte)'x'); + ByteBuffer bad = ByteBuffer.allocate(500); + RawFrameBuilder.putOpFin(bad,OpCode.CLOSE,true); + RawFrameBuilder.putLength(bad,msg.length + 2,false); + bad.putShort((short)StatusCode.NORMAL); + bad.put(msg); + BufferUtil.flipToFlush(bad,0); + serverConn.write(bad); + + // client should have noticed the error + clientSocket.assertReceivedError(ProtocolException.class,containsString("Invalid control frame")); + + // client parse invalid frame, notifies server of close (protocol error) + confirmServerReceivedCloseFrame(serverConn,StatusCode.PROTOCOL,allOf(containsString("Invalid control frame"),containsString("length"))); + + // server disconnects + serverConn.disconnect(); + + // client triggers close event on client ws-endpoint + clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length"))); + } + + @Test + public void testReadEOF() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + // Client connects + CloseTrackingSocket clientSocket = new CloseTrackingSocket(); + Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); + + // Server accepts connect + ServerConnection serverConn = server.accept(); + serverConn.upgrade(); + + // client confirms connection via echo + confirmConnection(clientSocket,clientConnectFuture,serverConn); + + // client sends close frame + final String origCloseReason = "Normal Close"; + clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); + + // server receives close frame + confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); + + // client should not have received close message (yet) + clientSocket.assertNoCloseEvent(); + + // server shuts down connection (no frame reply) + serverConn.disconnect(); + + // client reads -1 (EOF) + // client triggers close event on client ws-endpoint + clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF")); + } + + @Test + public void testServerNoCloseHandshake() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + // Client connects + CloseTrackingSocket clientSocket = new CloseTrackingSocket(); + Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); + + // Server accepts connect + ServerConnection serverConn = server.accept(); + serverConn.upgrade(); + + // client confirms connection via echo + confirmConnection(clientSocket,clientConnectFuture,serverConn); + + // client sends close frame + final String origCloseReason = "Normal Close"; + clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); + + // server receives close frame + confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); + + // client should not have received close message (yet) + clientSocket.assertNoCloseEvent(); + + // server never sends close frame handshake + // server sits idle + + // client idle timeout triggers close event on client ws-endpoint + // assert - close code==1006 (abnormal) + // assert - close reason message contains (timeout) + clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout")); + } + + @Test + public void testStopLifecycle() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + int clientCount = 3; + CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount]; + ServerConnection serverConns[] = new ServerConnection[clientCount]; + + // Connect Multiple Clients + for (int i = 0; i < clientCount; i++) + { + // Client Request Upgrade + clientSockets[i] = new CloseTrackingSocket(); + Future clientConnectFuture = client.connect(clientSockets[i],server.getWsUri()); + + // Server accepts connection + serverConns[i] = server.accept(); + serverConns[i].upgrade(); + + // client confirms connection via echo + confirmConnection(clientSockets[i],clientConnectFuture,serverConns[i]); + } + + // client lifecycle stop + client.stop(); + + // clients send close frames (code 1001, shutdown) + for (int i = 0; i < clientCount; i++) + { + // server receives close frame + confirmServerReceivedCloseFrame(serverConns[i],StatusCode.SHUTDOWN,containsString("Shutdown")); + } + + // clients disconnect + for (int i = 0; i < clientCount; i++) + { + clientSockets[i].assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Shutdown")); + } + } + + @Test + public void testWriteException() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + // Client connects + CloseTrackingSocket clientSocket = new CloseTrackingSocket(); + Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); + + // Server accepts connect + ServerConnection serverConn = server.accept(); + serverConn.upgrade(); + + // client confirms connection via echo + confirmConnection(clientSocket,clientConnectFuture,serverConn); + + // setup client endpoint for write failure (test only) + EndPoint endp = clientSocket.getEndPoint(); + endp.shutdownOutput(); + + // client enqueue close frame + // client write failure + final String origCloseReason = "Normal Close"; + clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); + + clientSocket.assertReceivedError(EofException.class,null); + + // client triggers close event on client ws-endpoint + // assert - close code==1006 (abnormal) + // assert - close reason message contains (write failure) + clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF")); + } +} diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java index e06e883a6e2..2cea2fda81a 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ServerWriteThread.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.websocket.client; import java.io.IOException; -import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -32,7 +31,6 @@ public class ServerWriteThread extends Thread { private static final Logger LOG = Log.getLogger(ServerWriteThread.class); private final ServerConnection conn; - private Exchanger exchanger; private int slowness = -1; private int messageCount = 100; private String message = "Hello"; @@ -42,11 +40,6 @@ public class ServerWriteThread extends Thread this.conn = conn; } - public Exchanger getExchanger() - { - return exchanger; - } - public String getMessage() { return message; @@ -73,12 +66,6 @@ public class ServerWriteThread extends Thread { conn.write(new TextFrame().setPayload(message)); - if (exchanger != null) - { - // synchronized on exchange - exchanger.exchange(message); - } - m.incrementAndGet(); if (slowness > 0) @@ -93,11 +80,6 @@ public class ServerWriteThread extends Thread } } - public void setExchanger(Exchanger exchanger) - { - this.exchanger = exchanger; - } - public void setMessage(String message) { this.message = message; diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java index 80720f329f3..4f56ec6d5b0 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/SlowServerTest.java @@ -49,7 +49,7 @@ public class SlowServerTest public void startClient() throws Exception { client = new WebSocketClient(); - client.getPolicy().setIdleTimeout(60000); + client.setMaxIdleTimeout(60000); client.start(); } @@ -78,7 +78,7 @@ public class SlowServerTest { JettyTrackingSocket tsocket = new JettyTrackingSocket(); client.setMasker(new ZeroMasker()); - client.getPolicy().setIdleTimeout(60000); + client.setMaxIdleTimeout(60000); URI wsUri = server.getWsUri(); Future future = client.connect(tsocket,wsUri); @@ -123,41 +123,38 @@ public class SlowServerTest @Slow public void testServerSlowToSend() throws Exception { - // final Exchanger exchanger = new Exchanger(); - JettyTrackingSocket tsocket = new JettyTrackingSocket(); - // tsocket.messageExchanger = exchanger; + JettyTrackingSocket clientSocket = new JettyTrackingSocket(); client.setMasker(new ZeroMasker()); - client.getPolicy().setIdleTimeout(60000); + client.setMaxIdleTimeout(60000); URI wsUri = server.getWsUri(); - Future future = client.connect(tsocket,wsUri); + Future clientConnectFuture = client.connect(clientSocket,wsUri); - ServerConnection sconnection = server.accept(); - sconnection.setSoTimeout(60000); - sconnection.upgrade(); + ServerConnection serverConn = server.accept(); + serverConn.setSoTimeout(60000); + serverConn.upgrade(); // Confirm connected - future.get(500,TimeUnit.MILLISECONDS); - tsocket.waitForConnected(500,TimeUnit.MILLISECONDS); + clientConnectFuture.get(500,TimeUnit.MILLISECONDS); + clientSocket.waitForConnected(500,TimeUnit.MILLISECONDS); // Have server write slowly. int messageCount = 1000; - ServerWriteThread writer = new ServerWriteThread(sconnection); + ServerWriteThread writer = new ServerWriteThread(serverConn); writer.setMessageCount(messageCount); writer.setMessage("Hello"); - // writer.setExchanger(exchanger); writer.setSlowness(10); writer.start(); writer.join(); // Verify receive - Assert.assertThat("Message Receive Count",tsocket.messageQueue.size(),is(messageCount)); + Assert.assertThat("Message Receive Count",clientSocket.messageQueue.size(),is(messageCount)); // Close - sconnection.close(StatusCode.NORMAL); + serverConn.close(StatusCode.NORMAL); - Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS)); - tsocket.assertCloseCode(StatusCode.NORMAL); + Assert.assertTrue("Client Socket Closed",clientSocket.closeLatch.await(10,TimeUnit.SECONDS)); + clientSocket.assertCloseCode(StatusCode.NORMAL); } } diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java deleted file mode 100644 index 72c1fc19545..00000000000 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/TimeoutTest.java +++ /dev/null @@ -1,115 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.client; - -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -import java.net.URI; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.toolchain.test.TestTracker; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.common.test.BlockheadServer; -import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -/** - * Various tests for Timeout handling - */ -public class TimeoutTest -{ - @Rule - public TestTracker tt = new TestTracker(); - - private BlockheadServer server; - private WebSocketClient client; - - @Before - public void startClient() throws Exception - { - client = new WebSocketClient(); - client.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here) - client.start(); - } - - @Before - public void startServer() throws Exception - { - server = new BlockheadServer(); - server.start(); - } - - @After - public void stopClient() throws Exception - { - client.stop(); - } - - @After - public void stopServer() throws Exception - { - server.stop(); - } - - /** - * In a situation where the upgrade/connection is successful, and there is no activity for a while, the idle timeout triggers on the client side and - * automatically initiates a close handshake. - */ - @Test - public void testIdleDetectedByClient() throws Exception - { - JettyTrackingSocket wsocket = new JettyTrackingSocket(); - - URI wsUri = server.getWsUri(); - client.setMaxIdleTimeout(1000); - Future future = client.connect(wsocket,wsUri); - - ServerConnection ssocket = server.accept(); - ssocket.upgrade(); - - try - { - ssocket.startEcho(); - // Validate that connect occurred - future.get(500,TimeUnit.MILLISECONDS); - wsocket.waitForConnected(500,TimeUnit.MILLISECONDS); - - // Wait for inactivity idle timeout. - long start = System.currentTimeMillis(); - wsocket.waitForClose(2,TimeUnit.SECONDS); - long end = System.currentTimeMillis(); - long dur = (end - start); - // Make sure idle timeout takes less than 5 total seconds - Assert.assertThat("Idle Timeout",dur,lessThanOrEqualTo(3000L)); - - // Client should see a close event, with abnormal status NO_CLOSE - wsocket.assertCloseCode(StatusCode.ABNORMAL); - } - finally - { - ssocket.stopEcho(); - } - } -} diff --git a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties index 7c9bd368345..9668b131052 100644 --- a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties @@ -1,12 +1,17 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.LEVEL=WARN # org.eclipse.jetty.LEVEL=DEBUG -# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO +# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG +# org.eclipse.jetty.io.SelectChannelEndPoint.LEVEL=DEBUG +# org.eclipse.jetty.io.IdleTimeout.LEVEL=DEBUG +# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG +# org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG # org.eclipse.jetty.websocket.LEVEL=WARN # org.eclipse.jetty.websocket.LEVEL=DEBUG # org.eclipse.jetty.websocket.client.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG -# org.eclipse.jetty.websocket.common.io.WriteBytesProvider.LEVEL=DEBUG +# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG + # org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG # org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java index 03a737c1435..f14c25241ae 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common; import java.nio.ByteBuffer; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception; import org.eclipse.jetty.util.Utf8StringBuilder; @@ -65,7 +66,8 @@ public class CloseInfo statusCode |= (data.get() & 0xFF) << 8; statusCode |= (data.get() & 0xFF); - if(validate) { + if (validate) + { if ((statusCode < StatusCode.NORMAL) || (statusCode == StatusCode.UNDEFINED) || (statusCode == StatusCode.NO_CLOSE) || (statusCode == StatusCode.NO_CODE) || ((statusCode > 1011) && (statusCode <= 2999)) || (statusCode >= 5000)) { @@ -120,7 +122,7 @@ public class CloseInfo public CloseInfo(int statusCode) { - this(statusCode, null); + this(statusCode,null); } public CloseInfo(int statusCode, String reason) @@ -144,8 +146,9 @@ public class CloseInfo utf = StringUtil.getUtf8Bytes(reason); len += utf.length; } - - ByteBuffer buf = ByteBuffer.allocate(len); + + ByteBuffer buf = BufferUtil.allocate(len); + BufferUtil.flipToFill(buf); buf.put((byte)((statusCode >>> 8) & 0xFF)); buf.put((byte)((statusCode >>> 0) & 0xFF)); @@ -153,7 +156,7 @@ public class CloseInfo { buf.put(utf,0,utf.length); } - buf.flip(); + BufferUtil.flipToFlush(buf,0); return buf; } @@ -162,7 +165,14 @@ public class CloseInfo { CloseFrame frame = new CloseFrame(); frame.setFin(true); - frame.setPayload(asByteBuffer()); + if ((statusCode >= 1000) && (statusCode != StatusCode.NO_CLOSE) && (statusCode != StatusCode.NO_CODE)) + { + if (statusCode == StatusCode.FAILED_TLS_HANDSHAKE) + { + throw new ProtocolException("Close Frame with status code " + statusCode + " not allowed (per RFC6455)"); + } + frame.setPayload(asByteBuffer()); + } return frame; } @@ -180,10 +190,10 @@ public class CloseInfo { return !((statusCode == StatusCode.NORMAL) || (statusCode == StatusCode.NO_CODE)); } - + public boolean isAbnormal() { - return (statusCode == StatusCode.ABNORMAL); + return (statusCode != StatusCode.NORMAL); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java index 0cf7e5f4b2c..86195a0289d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java @@ -96,8 +96,9 @@ public class Parser private void assertSanePayloadLength(long len) { - if (LOG.isDebugEnabled()) - LOG.debug("Payload Length: {} - {}",len,this); + if (LOG.isDebugEnabled()) { + LOG.debug("{} Payload Length: {} - {}",policy.getBehavior(),len,this); + } // Since we use ByteBuffer so often, having lengths over Integer.MAX_VALUE is really impossible. if (len > Integer.MAX_VALUE) @@ -239,7 +240,7 @@ public class Parser incomingFramesHandler.incomingError(e); } - public void parse(ByteBuffer buffer) + public void parse(ByteBuffer buffer) throws WebSocketException { if (buffer.remaining() <= 0) { @@ -266,13 +267,20 @@ public class Parser { buffer.position(buffer.limit()); // consume remaining reset(); + // let session know notifyWebSocketException(e); + // need to throw for proper close behavior in connection + throw e; } catch (Throwable t) { buffer.position(buffer.limit()); // consume remaining reset(); - notifyWebSocketException(new WebSocketException(t)); + // let session know + WebSocketException e = new WebSocketException(t); + notifyWebSocketException(e); + // need to throw for proper close behavior in connection + throw e; } } @@ -299,7 +307,9 @@ public class Parser private boolean parseFrame(ByteBuffer buffer) { if (LOG.isDebugEnabled()) + { LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining()); + } while (buffer.hasRemaining()) { switch (state) @@ -318,7 +328,8 @@ public class Parser } if (LOG.isDebugEnabled()) - LOG.debug("OpCode {}, fin={} rsv={}{}{}", + LOG.debug("{} OpCode {}, fin={} rsv={}{}{}", + policy.getBehavior(), OpCode.name(opcode), fin, (isRsv1InUse()?'1':'.'), @@ -412,11 +423,6 @@ public class Parser throw new ProtocolException("RSV3 not allowed to be set"); } } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("OpCode {}, fin={} rsv=000",OpCode.name(opcode),fin); - } state = State.PAYLOAD_LEN; break; @@ -591,8 +597,9 @@ public class Parser buffer.limit(limit); buffer.position(buffer.position() + window.remaining()); - if (LOG.isDebugEnabled()) - LOG.debug("Window: {}",BufferUtil.toDetailString(window)); + if (LOG.isDebugEnabled()) { + LOG.debug("{} Window: {}",policy.getBehavior(),BufferUtil.toDetailString(window)); + } maskProcessor.process(window); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 460d1d73b54..04637e392da 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; +import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory; @@ -90,20 +91,19 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc @Override public void close() { - this.close(StatusCode.NORMAL, null); + this.close(StatusCode.NORMAL,null); } @Override public void close(CloseStatus closeStatus) { - this.close(closeStatus.getCode(), closeStatus.getPhrase()); + this.close(closeStatus.getCode(),closeStatus.getPhrase()); } @Override public void close(int statusCode, String reason) { - connection.close(statusCode, reason); - notifyClose(statusCode, reason); + connection.close(statusCode,reason); } /** @@ -115,7 +115,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc connection.disconnect(); // notify of harsh disconnect - notifyClose(StatusCode.NO_CLOSE, "Harsh disconnect"); + notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect"); } public void dispatch(Runnable runnable) @@ -130,7 +130,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc out.append(indent).append(" +- incomingHandler : "); if (incomingHandler instanceof Dumpable) { - ((Dumpable)incomingHandler).dump(out, indent + " "); + ((Dumpable)incomingHandler).dump(out,indent + " "); } else { @@ -140,7 +140,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc out.append(indent).append(" +- outgoingHandler : "); if (outgoingHandler instanceof Dumpable) { - ((Dumpable)outgoingHandler).dump(out, indent + " "); + ((Dumpable)outgoingHandler).dump(out,indent + " "); } else { @@ -273,7 +273,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc { final int prime = 31; int result = 1; - result = (prime * result) + ((connection == null) ? 0 : connection.hashCode()); + result = (prime * result) + ((connection == null)?0:connection.hashCode()); return result; } @@ -328,7 +328,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc public void notifyClose(int statusCode, String reason) { - websocket.onClose(new CloseInfo(statusCode, reason)); + if (LOG.isDebugEnabled()) + { + LOG.debug("notifyClose({},{})",statusCode,reason); + } + websocket.onClose(new CloseInfo(statusCode,reason)); } public void notifyError(Throwable cause) @@ -342,12 +346,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc { switch (state) { - case CLOSING: + case CLOSED: // notify session listeners for (SessionListener listener : sessionListeners) { try { + LOG.debug("{}.onSessionClosed()",listener.getClass().getSimpleName()); listener.onSessionClosed(this); } catch (Throwable t) @@ -355,12 +360,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc LOG.ignore(t); } } - break; - case CLOSED: IOState ioState = this.connection.getIOState(); CloseInfo close = ioState.getCloseInfo(); // confirmed close of local endpoint - notifyClose(close.getStatusCode(), close.getReason()); + notifyClose(close.getStatusCode(),close.getReason()); break; case OPEN: // notify session listeners @@ -394,17 +397,32 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc connection.getIOState().onConnected(); // Connect remote - remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchMode()); + remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode()); - // Open WebSocket - websocket.openSession(this); - - // Open connection - connection.getIOState().onOpened(); - - if (LOG.isDebugEnabled()) + try { - LOG.debug("open -> {}", dump()); + // Open WebSocket + websocket.openSession(this); + + // Open connection + connection.getIOState().onOpened(); + + if (LOG.isDebugEnabled()) + { + LOG.debug("open -> {}",dump()); + } + } + catch (Throwable t) + { + // Exception on end-user WS-Endpoint. + // Fast-fail & close connection with reason. + int statusCode = StatusCode.SERVER_ERROR; + if(policy.getBehavior() == WebSocketBehavior.CLIENT) + { + statusCode = StatusCode.POLICY_VIOLATION; + } + + close(statusCode,t.getMessage()); } } @@ -444,11 +462,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc List values = entry.getValue(); if (values != null) { - this.parameterMap.put(entry.getKey(), values.toArray(new String[values.size()])); + this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()])); } else { - this.parameterMap.put(entry.getKey(), new String[0]); + this.parameterMap.put(entry.getKey(),new String[0]); } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java index 8cc60f06c43..7a55b2db4f9 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/AbstractEventDriver.java @@ -88,13 +88,7 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver { if (LOG.isDebugEnabled()) { - LOG.debug("incoming(WebSocketException)",e); - } - - if (e instanceof CloseException) - { - CloseException close = (CloseException)e; - terminateConnection(close.getStatusCode(),close.getMessage()); + LOG.debug("incomingError(" + e.getClass().getName() + ")",e); } onError(e); @@ -105,7 +99,7 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver { if (LOG.isDebugEnabled()) { - LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame); + LOG.debug("incomingFrame({})",frame); } try @@ -226,6 +220,7 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver catch (Throwable t) { unhandled(t); + throw t; } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index c9f98bb9d57..cea5909865c 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; @@ -54,13 +53,13 @@ import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.ConnectionState; import org.eclipse.jetty.websocket.common.Generator; import org.eclipse.jetty.websocket.common.LogicalConnection; +import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.Parser; import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; /** - * Provides the implementation of {@link LogicalConnection} within the - * framework of the new {@link Connection} framework of {@code jetty-io}. + * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of {@code jetty-io}. */ public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable { @@ -68,7 +67,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint) { - super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8); + super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8); } @Override @@ -106,7 +105,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp // Abnormal Close reason = CloseStatus.trimMaxReasonLength(reason); session.notifyError(x); - session.notifyClose(StatusCode.NO_CLOSE,reason); + session.notifyClose(StatusCode.ABNORMAL,reason); disconnect(); // disconnect endpoint & connection } @@ -116,10 +115,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { private final boolean outputOnly; - public OnDisconnectCallback(boolean outputOnly) { + public OnDisconnectCallback(boolean outputOnly) + { this.outputOnly = outputOnly; } - + @Override public void writeFailed(Throwable x) { @@ -218,10 +218,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void close(int statusCode, String reason) { + LOG.debug("close({},{})",statusCode,reason); CloseInfo close = new CloseInfo(statusCode,reason); if (statusCode == StatusCode.ABNORMAL) { - flusher.close(); // TODO this makes the IdleTimeoutTest pass, but I'm dubious it is the correct way ioState.onAbnormalClose(close); } else @@ -230,7 +230,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } - @Override public void disconnect() { @@ -366,7 +365,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void onClose() { + LOG.debug("{} onClose()",policy.getBehavior()); super.onClose(); + // ioState.onDisconnected(); flusher.close(); } @@ -385,18 +386,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { // Fire out a close frame, indicating abnormal shutdown, then disconnect CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason()); - outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false), BatchMode.OFF); - } - else - { - // Just disconnect - this.disconnect(false); + outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF); } + // Just disconnect + this.disconnect(false); break; case CLOSING: CloseInfo close = ioState.getCloseInfo(); // reply to close handshake from remote - outgoingFrame(close.asFrame(),new OnDisconnectCallback(true), BatchMode.OFF); + outgoingFrame(close.asFrame(),new OnDisconnectCallback(true),BatchMode.OFF); default: break; } @@ -447,20 +445,26 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override protected boolean onReadTimeout() { - LOG.debug("{} Read Timeout",policy.getBehavior()); - IOState state = getIOState(); - if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED)) + ConnectionState cstate = state.getConnectionState(); + LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate); + + if (cstate == ConnectionState.CLOSED) { - // close already initiated, extra timeouts not relevant + // close already completed, extra timeouts not relevant // allow underlying connection and endpoint to disconnect on its own return true; } - // Initiate close - politely send close frame. - session.notifyError(new SocketTimeoutException("Timeout on Read")); - // This is an Abnormal Close condition - close(StatusCode.ABNORMAL,"Idle Timeout"); + try + { + session.notifyError(new SocketTimeoutException("Timeout on Read")); + } + finally + { + // This is an Abnormal Close condition + close(StatusCode.ABNORMAL,"Idle Timeout"); + } return false; } @@ -476,7 +480,21 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp LOG.debug("outgoingFrame({}, {})",frame,callback); } - flusher.enqueue(frame,callback, batchMode); + CloseInfo close = null; + // grab a copy of the frame details before masking and whatnot + if (frame.getOpCode() == OpCode.CLOSE) + { + close = new CloseInfo(frame); + } + + flusher.enqueue(frame,callback,batchMode); + + // now trigger local close + if (close != null) + { + LOG.debug("outgoing CLOSE frame - {}: {}",frame,close); + ioState.onCloseLocal(close); + } } private int read(ByteBuffer buffer) @@ -504,7 +522,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer)); } parser.parse(buffer); - // TODO: has the end user application already consumed what it was given? } } } @@ -520,6 +537,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp close(e.getStatusCode(),e.getMessage()); return -1; } + catch (Throwable t) + { + LOG.warn(t); + close(StatusCode.ABNORMAL,t.getMessage()); + return -1; + } } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index a7bb3cb8c7d..6e995e579fc 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -29,7 +29,6 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -45,246 +44,22 @@ import org.eclipse.jetty.websocket.common.frames.BinaryFrame; */ public class FrameFlusher { - public static final BinaryFrame FLUSH_FRAME = new BinaryFrame(); - private static final Logger LOG = Log.getLogger(FrameFlusher.class); - - private final ByteBufferPool bufferPool; - private final EndPoint endpoint; - private final int bufferSize; - private final Generator generator; - private final int maxGather; - private final Object lock = new Object(); - private final ArrayQueue queue = new ArrayQueue<>(16, 16, lock); - private final Flusher flusher = new Flusher(); - private final AtomicBoolean closed = new AtomicBoolean(); - private volatile Throwable failure; - - public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather) - { - this.bufferPool = bufferPool; - this.endpoint = endpoint; - this.bufferSize = bufferSize; - this.generator = Objects.requireNonNull(generator); - this.maxGather = maxGather; - } - - public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode) - { - if (closed.get()) - { - notifyCallbackFailure(callback, new EOFException("Connection has been closed locally")); - return; - } - if (flusher.isFailed()) - { - notifyCallbackFailure(callback, failure); - return; - } - - FrameEntry entry = new FrameEntry(frame, callback, batchMode); - - synchronized (lock) - { - switch (frame.getOpCode()) - { - case OpCode.PING: - { - // Prepend PINGs so they are processed first. - queue.add(0, entry); - break; - } - case OpCode.CLOSE: - { - // There may be a chance that other frames are - // added after this close frame, but we will - // fail them later to keep it simple here. - closed.set(true); - queue.add(entry); - break; - } - default: - { - queue.add(entry); - break; - } - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("{} queued {}", this, entry); - - flusher.iterate(); - } - - public void close() - { - if (closed.compareAndSet(false, true)) - { - LOG.debug("{} closing {}", this); - EOFException eof = new EOFException("Connection has been closed locally"); - flusher.failed(eof); - - // Fail also queued entries. - List entries = new ArrayList<>(); - synchronized (lock) - { - entries.addAll(queue); - queue.clear(); - } - // Notify outside sync block. - for (FrameEntry entry : entries) - notifyCallbackFailure(entry.callback, eof); - } - } - - protected void onFailure(Throwable x) - { - LOG.warn(x); - } - - protected void notifyCallbackSuccess(WriteCallback callback) - { - try - { - if (callback != null) - callback.writeSuccess(); - } - catch (Throwable x) - { - LOG.debug("Exception while notifying success of callback " + callback, x); - } - } - - protected void notifyCallbackFailure(WriteCallback callback, Throwable failure) - { - try - { - if (callback != null) - callback.writeFailed(failure); - } - catch (Throwable x) - { - LOG.debug("Exception while notifying failure of callback " + callback, x); - } - } - - @Override - public String toString() - { - ByteBuffer aggregate = flusher.aggregate; - return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]", - getClass().getSimpleName(), - queue.size(), - aggregate == null ? 0 : aggregate.position(), - failure); - } - private class Flusher extends IteratingCallback { private final List entries = new ArrayList<>(maxGather); - private final List buffers = new ArrayList<>(maxGather * 2 + 1); + private final List buffers = new ArrayList<>((maxGather * 2) + 1); private ByteBuffer aggregate; private BatchMode batchMode; - @Override - protected Action process() throws Exception - { - int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate); - BatchMode currentBatchMode = BatchMode.AUTO; - synchronized (lock) - { - while (entries.size() <= maxGather && !queue.isEmpty()) - { - FrameEntry entry = queue.remove(0); - currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode); - - // Force flush if we need to. - if (entry.frame == FLUSH_FRAME) - currentBatchMode = BatchMode.OFF; - - int payloadLength = BufferUtil.length(entry.frame.getPayload()); - int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength; - - // If it is a "big" frame, avoid copying into the aggregate buffer. - if (approxFrameLength > (bufferSize >> 2)) - currentBatchMode = BatchMode.OFF; - - // If the aggregate buffer overflows, do not batch. - space -= approxFrameLength; - if (space <= 0) - currentBatchMode = BatchMode.OFF; - - entries.add(entry); - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries); - - if (entries.isEmpty()) - { - if (batchMode != BatchMode.AUTO) - { - // Nothing more to do, release the aggregate buffer if we need to. - // Releasing it here rather than in succeeded() allows for its reuse. - releaseAggregate(); - return Action.IDLE; - } - - LOG.debug("{} auto flushing", FrameFlusher.this); - return flush(); - } - - batchMode = currentBatchMode; - - return currentBatchMode == BatchMode.OFF ? flush() : batch(); - } - - private Action flush() - { - if (!BufferUtil.isEmpty(aggregate)) - { - buffers.add(aggregate); - if (LOG.isDebugEnabled()) - LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate); - } - - // Do not allocate the iterator here. - for (int i = 0; i < entries.size(); ++i) - { - FrameEntry entry = entries.get(i); - // Skip the "synthetic" frame used for flushing. - if (entry.frame == FLUSH_FRAME) - continue; - buffers.add(entry.generateHeaderBytes()); - ByteBuffer payload = entry.frame.getPayload(); - if (BufferUtil.hasContent(payload)) - buffers.add(payload); - } - - if (LOG.isDebugEnabled()) - LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries); - - if (buffers.isEmpty()) - { - releaseAggregate(); - // We may have the FLUSH_FRAME to notify. - succeedEntries(); - return Action.IDLE; - } - - endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()])); - buffers.clear(); - return Action.SCHEDULED; - } - private Action batch() { if (aggregate == null) { - aggregate = bufferPool.acquire(bufferSize, true); + aggregate = bufferPool.acquire(bufferSize,true); if (LOG.isDebugEnabled()) - LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate); + { + LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate); + } } // Do not allocate the iterator here. @@ -296,17 +71,149 @@ public class FrameFlusher ByteBuffer payload = entry.frame.getPayload(); if (BufferUtil.hasContent(payload)) - BufferUtil.append(aggregate, payload); + { + BufferUtil.append(aggregate,payload); + } } if (LOG.isDebugEnabled()) - LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries); + { + LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries); + } succeeded(); return Action.SCHEDULED; } + @Override + protected void completed() + { + // This IteratingCallback never completes. + } + + @Override + public void failed(Throwable x) + { + for (FrameEntry entry : entries) + { + notifyCallbackFailure(entry.callback,x); + entry.release(); + } + entries.clear(); + super.failed(x); + failure = x; + onFailure(x); + } + + private Action flush() + { + if (!BufferUtil.isEmpty(aggregate)) + { + buffers.add(aggregate); + if (LOG.isDebugEnabled()) + { + LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate); + } + } + + // Do not allocate the iterator here. + for (int i = 0; i < entries.size(); ++i) + { + FrameEntry entry = entries.get(i); + // Skip the "synthetic" frame used for flushing. + if (entry.frame == FLUSH_FRAME) + { + continue; + } + buffers.add(entry.generateHeaderBytes()); + ByteBuffer payload = entry.frame.getPayload(); + if (BufferUtil.hasContent(payload)) + { + buffers.add(payload); + } + } + + if (LOG.isDebugEnabled()) + { + LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries); + } + + if (buffers.isEmpty()) + { + releaseAggregate(); + // We may have the FLUSH_FRAME to notify. + succeedEntries(); + return Action.IDLE; + } + + endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()])); + buffers.clear(); + return Action.SCHEDULED; + } + + @Override + protected Action process() throws Exception + { + int space = aggregate == null?bufferSize:BufferUtil.space(aggregate); + BatchMode currentBatchMode = BatchMode.AUTO; + synchronized (lock) + { + while ((entries.size() <= maxGather) && !queue.isEmpty()) + { + FrameEntry entry = queue.remove(0); + currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode); + + // Force flush if we need to. + if (entry.frame == FLUSH_FRAME) + { + currentBatchMode = BatchMode.OFF; + } + + int payloadLength = BufferUtil.length(entry.frame.getPayload()); + int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength; + + // If it is a "big" frame, avoid copying into the aggregate buffer. + if (approxFrameLength > (bufferSize >> 2)) + { + currentBatchMode = BatchMode.OFF; + } + + // If the aggregate buffer overflows, do not batch. + space -= approxFrameLength; + if (space <= 0) + { + currentBatchMode = BatchMode.OFF; + } + + entries.add(entry); + } + } + + if (LOG.isDebugEnabled()) + { + LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries); + } + + if (entries.isEmpty()) + { + if (batchMode != BatchMode.AUTO) + { + // Nothing more to do, release the aggregate buffer if we need to. + // Releasing it here rather than in succeeded() allows for its reuse. + releaseAggregate(); + return Action.IDLE; + } + + LOG.debug("{} auto flushing",FrameFlusher.this); + return flush(); + } + + batchMode = currentBatchMode; + + return currentBatchMode == BatchMode.OFF?flush():batch(); + } + private void releaseAggregate() { - if (aggregate != null && BufferUtil.isEmpty(aggregate)) + if ((aggregate != null) && BufferUtil.isEmpty(aggregate)) { bufferPool.release(aggregate); aggregate = null; @@ -331,26 +238,6 @@ public class FrameFlusher } entries.clear(); } - - @Override - protected void completed() - { - // This IteratingCallback never completes. - } - - @Override - public void failed(Throwable x) - { - for (FrameEntry entry : entries) - { - notifyCallbackFailure(entry.callback, x); - entry.release(); - } - entries.clear(); - super.failed(x); - failure = x; - onFailure(x); - } } private class FrameEntry @@ -374,7 +261,7 @@ public class FrameFlusher private void generateHeaderBytes(ByteBuffer buffer) { - generator.generateHeaderBytes(frame, buffer); + generator.generateHeaderBytes(frame,buffer); } private void release() @@ -389,7 +276,145 @@ public class FrameFlusher @Override public String toString() { - return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure); + return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure); } } + + public static final BinaryFrame FLUSH_FRAME = new BinaryFrame(); + private static final Logger LOG = Log.getLogger(FrameFlusher.class); + private final ByteBufferPool bufferPool; + private final EndPoint endpoint; + private final int bufferSize; + private final Generator generator; + private final int maxGather; + private final Object lock = new Object(); + private final ArrayQueue queue = new ArrayQueue<>(16,16,lock); + private final Flusher flusher = new Flusher(); + private final AtomicBoolean closed = new AtomicBoolean(); + private volatile Throwable failure; + + public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather) + { + this.bufferPool = bufferPool; + this.endpoint = endpoint; + this.bufferSize = bufferSize; + this.generator = Objects.requireNonNull(generator); + this.maxGather = maxGather; + } + + public void close() + { + if (closed.compareAndSet(false,true)) + { + LOG.debug("{} closing {}",this); + EOFException eof = new EOFException("Connection has been closed locally"); + flusher.failed(eof); + + // Fail also queued entries. + List entries = new ArrayList<>(); + synchronized (lock) + { + entries.addAll(queue); + queue.clear(); + } + // Notify outside sync block. + for (FrameEntry entry : entries) + { + notifyCallbackFailure(entry.callback,eof); + } + } + } + + public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode) + { + if (closed.get()) + { + notifyCallbackFailure(callback,new EOFException("Connection has been closed locally")); + return; + } + if (flusher.isFailed()) + { + notifyCallbackFailure(callback,failure); + return; + } + + FrameEntry entry = new FrameEntry(frame,callback,batchMode); + + synchronized (lock) + { + switch (frame.getOpCode()) + { + case OpCode.PING: + { + // Prepend PINGs so they are processed first. + queue.add(0,entry); + break; + } + case OpCode.CLOSE: + { + // There may be a chance that other frames are + // added after this close frame, but we will + // fail them later to keep it simple here. + closed.set(true); + queue.add(entry); + break; + } + default: + { + queue.add(entry); + break; + } + } + } + + if (LOG.isDebugEnabled()) + { + LOG.debug("{} queued {}",this,entry); + } + + flusher.iterate(); + } + + protected void notifyCallbackFailure(WriteCallback callback, Throwable failure) + { + try + { + if (callback != null) + { + callback.writeFailed(failure); + } + } + catch (Throwable x) + { + LOG.debug("Exception while notifying failure of callback " + callback,x); + } + } + + protected void notifyCallbackSuccess(WriteCallback callback) + { + try + { + if (callback != null) + { + callback.writeSuccess(); + } + } + catch (Throwable x) + { + LOG.debug("Exception while notifying success of callback " + callback,x); + } + } + + protected void onFailure(Throwable x) + { + LOG.warn(x); + } + + @Override + public String toString() + { + ByteBuffer aggregate = flusher.aggregate; + return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(), + failure); + } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java index 6d4dbc42729..815d2d30ad0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java @@ -139,6 +139,10 @@ public class IOState { for (ConnectionStateListener listener : listeners) { + if (LOG.isDebugEnabled()) + { + LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name()); + } listener.onConnectionStateChange(state); } } @@ -166,8 +170,7 @@ public class IOState } this.state = ConnectionState.CLOSED; - if (closeInfo == null) - this.closeInfo = close; + this.closeInfo = close; this.inputAvailable = false; this.outputAvailable = false; this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; @@ -193,16 +196,16 @@ public class IOState if (initialState == ConnectionState.CONNECTED) { - // fast close. a local close request from end-user onConnected() method + // fast close. a local close request from end-user onConnect/onOpen method LOG.debug("FastClose in CONNECTED detected"); // Force the state open (to allow read/write to endpoint) onOpened(); + LOG.debug("FastClose continuing with Closure"); } synchronized (this) { - if (closeInfo == null) - closeInfo = close; + closeInfo = close; boolean in = inputAvailable; boolean out = outputAvailable; @@ -236,7 +239,6 @@ public class IOState LOG.debug("notifying state listeners: {}",event); notifyStateListeners(event); - /* // if abnormal, we don't expect an answer. if (close.isAbnormal()) { @@ -253,7 +255,6 @@ public class IOState notifyStateListeners(event); return; } - */ } } @@ -272,8 +273,7 @@ public class IOState return; } - if (closeInfo == null) - closeInfo = close; + closeInfo = close; boolean in = inputAvailable; boolean out = outputAvailable; @@ -360,7 +360,7 @@ public class IOState // already opened return; } - + if (this.state != ConnectionState.CONNECTED) { LOG.debug("Unable to open, not in CONNECTED state: {}",this.state); @@ -394,12 +394,11 @@ public class IOState return; } - CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF"); + CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Read EOF"); this.cleanClose = false; this.state = ConnectionState.CLOSED; - if (closeInfo == null) - this.closeInfo = close; + this.closeInfo = close; this.inputAvailable = false; this.outputAvailable = false; this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; @@ -408,6 +407,58 @@ public class IOState notifyStateListeners(event); } + public void onDisconnected() + { + ConnectionState event = null; + synchronized (this) + { + if (this.state == ConnectionState.CLOSED) + { + // already closed + return; + } + + CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected"); + + this.cleanClose = false; + this.state = ConnectionState.CLOSED; + this.closeInfo = close; + this.inputAvailable = false; + this.outputAvailable = false; + this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; + event = this.state; + } + notifyStateListeners(event); + } + + @Override + public String toString() + { + StringBuilder str = new StringBuilder(); + str.append(this.getClass().getSimpleName()); + str.append("@").append(Integer.toHexString(hashCode())); + str.append("[").append(state); + str.append(','); + if (!inputAvailable) + { + str.append('!'); + } + str.append("in,"); + if (!outputAvailable) + { + str.append('!'); + } + str.append("out"); + if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING)) + { + str.append(",close=").append(closeInfo); + str.append(",clean=").append(cleanClose); + str.append(",closeSource=").append(closeHandshakeSource); + } + str.append(']'); + return str.toString(); + } + public boolean wasAbnormalClose() { return closeHandshakeSource == CloseHandshakeSource.ABNORMAL; @@ -427,4 +478,5 @@ public class IOState { return closeHandshakeSource == CloseHandshakeSource.REMOTE; } + } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java index 32b9c1b72da..2194ec516c4 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/util/ReflectUtils.java @@ -392,4 +392,4 @@ public class ReflectUtils } return name; } -} +} \ No newline at end of file diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java new file mode 100644 index 00000000000..a40e3063369 --- /dev/null +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/CloseInfoTest.java @@ -0,0 +1,166 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common; + +import static org.eclipse.jetty.websocket.api.StatusCode.*; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.websocket.api.ProtocolException; +import org.eclipse.jetty.websocket.common.frames.CloseFrame; +import org.junit.Test; + +public class CloseInfoTest +{ + /** + * A test where no close is provided + */ + @Test + public void testAnonymousClose() + { + CloseInfo close = new CloseInfo(); + assertThat("close.code",close.getStatusCode(),is(NO_CODE)); + assertThat("close.reason",close.getReason(),nullValue()); + + CloseFrame frame = close.asFrame(); + assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE)); + // should result in no payload + assertThat("close frame has payload",frame.hasPayload(),is(false)); + assertThat("close frame payload length",frame.getPayloadLength(),is(0)); + } + + /** + * A test where NO_CODE (1005) is provided + */ + @Test + public void testNoCode() + { + CloseInfo close = new CloseInfo(NO_CODE); + assertThat("close.code",close.getStatusCode(),is(NO_CODE)); + assertThat("close.reason",close.getReason(),nullValue()); + + CloseFrame frame = close.asFrame(); + assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE)); + // should result in no payload + assertThat("close frame has payload",frame.hasPayload(),is(false)); + assertThat("close frame payload length",frame.getPayloadLength(),is(0)); + } + + /** + * A test where NO_CLOSE (1006) is provided + */ + @Test + public void testNoClose() + { + CloseInfo close = new CloseInfo(NO_CLOSE); + assertThat("close.code",close.getStatusCode(),is(NO_CLOSE)); + assertThat("close.reason",close.getReason(),nullValue()); + + CloseFrame frame = close.asFrame(); + assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE)); + // should result in no payload + assertThat("close frame has payload",frame.hasPayload(),is(false)); + assertThat("close frame payload length",frame.getPayloadLength(),is(0)); + } + + /** + * A test of FAILED_TLS_HANDSHAKE (1007) + */ + @Test + public void testFailedTlsHandshake() + { + CloseInfo close = new CloseInfo(FAILED_TLS_HANDSHAKE); + assertThat("close.code",close.getStatusCode(),is(FAILED_TLS_HANDSHAKE)); + assertThat("close.reason",close.getReason(),nullValue()); + + try + { + @SuppressWarnings("unused") + CloseFrame frame = close.asFrame(); + fail("Expected " + ProtocolException.class.getName()); + } + catch (ProtocolException e) + { + // expected path + assertThat("ProtocolException message",e.getMessage(),containsString("not allowed (per RFC6455)")); + } + } + + /** + * A test of NORMAL (1000) + */ + @Test + public void testNormal() + { + CloseInfo close = new CloseInfo(NORMAL); + assertThat("close.code",close.getStatusCode(),is(NORMAL)); + assertThat("close.reason",close.getReason(),nullValue()); + + CloseFrame frame = close.asFrame(); + assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE)); + assertThat("close frame payload length",frame.getPayloadLength(),is(2)); + } + + private ByteBuffer asByteBuffer(int statusCode, String reason) + { + int len = 2; // status code length + byte utf[] = null; + if (StringUtil.isNotBlank(reason)) + { + utf = StringUtil.getUtf8Bytes(reason); + len += utf.length; + } + + ByteBuffer buf = BufferUtil.allocate(len); + BufferUtil.flipToFill(buf); + buf.put((byte)((statusCode >>> 8) & 0xFF)); + buf.put((byte)((statusCode >>> 0) & 0xFF)); + + if (utf != null) + { + buf.put(utf,0,utf.length); + } + BufferUtil.flipToFlush(buf,0); + + return buf; + } + + @Test + public void testFromFrame() + { + ByteBuffer payload = asByteBuffer(NORMAL,null); + assertThat("payload length", payload.remaining(), is(2)); + CloseFrame frame = new CloseFrame(); + frame.setPayload(payload); + + // create from frame + CloseInfo close = new CloseInfo(frame); + assertThat("close.code",close.getStatusCode(),is(NORMAL)); + assertThat("close.reason",close.getReason(),nullValue()); + + // and back again + frame = close.asFrame(); + assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE)); + assertThat("close frame payload length",frame.getPayloadLength(),is(2)); + } +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java index fe6011db002..6b0f01836c3 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/ab/TestABCase4.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.ab; import java.nio.ByteBuffer; import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.ProtocolException; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; @@ -39,8 +40,7 @@ public class TestABCase4 { ByteBuffer expected = ByteBuffer.allocate(32); - expected.put(new byte[] - { (byte)0x8b, 0x00 }); + expected.put(new byte[] { (byte)0x8b, 0x00 }); expected.flip(); @@ -50,10 +50,17 @@ public class TestABCase4 { Parser parser = new UnitParser(policy); parser.setIncomingFramesHandler(capture); - parser.parse(expected); + try + { + parser.parse(expected); + } + catch (ProtocolException ignore) + { + // ignore + } } - Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ; + Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class)); Throwable known = capture.getErrors().poll(); @@ -65,8 +72,7 @@ public class TestABCase4 { ByteBuffer expected = ByteBuffer.allocate(32); - expected.put(new byte[] - { (byte)0x8c, 0x01, 0x00 }); + expected.put(new byte[] { (byte)0x8c, 0x01, 0x00 }); expected.flip(); @@ -76,24 +82,29 @@ public class TestABCase4 { Parser parser = new UnitParser(policy); parser.setIncomingFramesHandler(capture); - parser.parse(expected); + try + { + parser.parse(expected); + } + catch (ProtocolException ignore) + { + // ignore + } } - Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ; + Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class)); Throwable known = capture.getErrors().poll(); Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 12")); } - @Test public void testParserNonControlOpCode3Case4_1_1() throws Exception { ByteBuffer expected = ByteBuffer.allocate(32); - expected.put(new byte[] - { (byte)0x83, 0x00 }); + expected.put(new byte[] { (byte)0x83, 0x00 }); expected.flip(); @@ -103,10 +114,17 @@ public class TestABCase4 { Parser parser = new UnitParser(policy); parser.setIncomingFramesHandler(capture); - parser.parse(expected); + try + { + parser.parse(expected); + } + catch (ProtocolException ignore) + { + // ignore + } } - Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ; + Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class)); Throwable known = capture.getErrors().poll(); @@ -118,8 +136,7 @@ public class TestABCase4 { ByteBuffer expected = ByteBuffer.allocate(32); - expected.put(new byte[] - { (byte)0x84, 0x01, 0x00 }); + expected.put(new byte[] { (byte)0x84, 0x01, 0x00 }); expected.flip(); @@ -129,10 +146,17 @@ public class TestABCase4 { Parser parser = new UnitParser(policy); parser.setIncomingFramesHandler(capture); - parser.parse(expected); + try + { + parser.parse(expected); + } + catch (ProtocolException ignore) + { + // ignore + } } - Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ; + Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class)); Throwable known = capture.getErrors().poll(); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java index bd02460c663..dedba3e351c 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadClient.java @@ -18,7 +18,8 @@ package org.eclipse.jetty.websocket.common.test; -import java.io.Closeable; +import static org.hamcrest.Matchers.*; + import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -68,10 +69,6 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser; import org.junit.Assert; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; - /** * A simple websocket client for performing unit tests with. *

@@ -84,7 +81,7 @@ import static org.hamcrest.Matchers.notNullValue; * with regards to basic IO behavior, a write should work as expected, a read should work as expected, but what byte it sends or reads is not within its * scope. */ -public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, Closeable +public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, AutoCloseable { private static final String REQUEST_HASH_KEY = "dGhlIHNhbXBsZSBub25jZQ=="; private static final int BUFFER_SIZE = 8192; @@ -182,22 +179,14 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti public void close(int statusCode, String message) { + LOG.debug("close({},{})",statusCode,message); CloseInfo close = new CloseInfo(statusCode,message); - ioState.onCloseLocal(close); - if (!ioState.isClosed()) { - WebSocketFrame frame = close.asFrame(); - LOG.debug("Issuing: {}",frame); - try - { - write(frame); - } - catch (IOException e) - { - LOG.debug(e); - } + ioState.onCloseLocal(close); + } else { + LOG.debug("Not issuing close. ioState = {}",ioState); } } @@ -429,13 +418,8 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti { LOG.info("Client parsed {} frames",count); } - - if (frame.getOpCode() == OpCode.CLOSE) - { - CloseInfo close = new CloseInfo(frame); - ioState.onCloseRemote(close); - } - + + // Capture Frame Copy WebSocketFrame copy = WebSocketFrame.copy(frame); incomingFrames.incomingFrame(copy); } @@ -448,6 +432,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti @Override public void onConnectionStateChange(ConnectionState state) { + LOG.debug("CLIENT onConnectionStateChange() - {}", state); switch (state) { case CLOSED: @@ -455,10 +440,17 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti // this.disconnect(); break; case CLOSING: - if (ioState.wasRemoteCloseInitiated()) + CloseInfo close = ioState.getCloseInfo(); + + WebSocketFrame frame = close.asFrame(); + LOG.debug("Issuing: {}",frame); + try { - CloseInfo close = ioState.getCloseInfo(); - close(close.getStatusCode(),close.getReason()); + write(frame); + } + catch (IOException e) + { + LOG.debug(e); } break; default: @@ -701,6 +693,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti { if (!ioState.isOpen()) { + LOG.debug("IO Not Open / Not Writing: {}",frame); return; } LOG.debug("write(Frame->{}) to {}",frame,outgoing); diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java index c3cd1ad8554..1b665bded92 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServer.java @@ -54,6 +54,7 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; +import org.eclipse.jetty.websocket.api.extensions.Frame.Type; import org.eclipse.jetty.websocket.common.AcceptHash; import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.Generator; @@ -124,7 +125,6 @@ public class BlockheadServer { write(new CloseFrame()); flush(); - disconnect(); } public void close(int statusCode) throws IOException @@ -132,7 +132,6 @@ public class BlockheadServer CloseInfo close = new CloseInfo(statusCode); write(close.asFrame()); flush(); - disconnect(); } public void disconnect() @@ -229,6 +228,19 @@ public class BlockheadServer CloseInfo close = new CloseInfo(frame); LOG.debug("Close frame: {}",close); } + + Type type = frame.getType(); + if (echoing.get() && (type.isData() || type.isContinuation())) + { + try + { + write(WebSocketFrame.copy(frame)); + } + catch (IOException e) + { + LOG.warn(e); + } + } } @Override @@ -317,9 +329,18 @@ public class BlockheadServer return len; } + /** + * @deprecated use {@link #readFrames(int, int, TimeUnit)} for correct parameter order + */ + @Deprecated public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException { - LOG.debug("Read: waiting for {} frame(s) from server",expectedCount); + return readFrames(expectedCount,timeoutDuration,timeoutUnit); + } + + public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException + { + LOG.debug("Read: waiting for {} frame(s) from client",expectedCount); int startCount = incomingFrames.size(); ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false); @@ -562,13 +583,22 @@ public class BlockheadServer public void write(Frame frame) throws IOException { LOG.debug("write(Frame->{}) to {}",frame,outgoing); - outgoing.outgoingFrame(frame,null, BatchMode.OFF); + outgoing.outgoingFrame(frame,null,BatchMode.OFF); } public void write(int b) throws IOException { getOutputStream().write(b); } + + public void write(ByteBuffer buf) throws IOException + { + byte arr[] = BufferUtil.toArray(buf); + if ((arr != null) && (arr.length > 0)) + { + getOutputStream().write(arr); + } + } } private static final Logger LOG = Log.getLogger(BlockheadServer.class); diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index bebe7c0c379..f6a2f2c7bd8 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -44,6 +44,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.api.InvalidWebSocketException; +import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory; @@ -199,11 +200,23 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc } } - protected void closeAllConnections() + protected void shutdownAllConnections() { for (WebSocketSession session : openSessions) { - session.close(); + if (session.getConnection() != null) + { + try + { + session.getConnection().close( + StatusCode.SHUTDOWN, + "Shutdown"); + } + catch (Throwable t) + { + LOG.debug("During Shutdown All Connections",t); + } + } } openSessions.clear(); } @@ -269,7 +282,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc @Override protected void doStop() throws Exception { - closeAllConnections(); + shutdownAllConnections(); super.doStop(); } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java index 0c2ddb32eae..5a4707c1af1 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java @@ -114,7 +114,7 @@ public class WebSocketCloseTest public void onWebSocketConnect(Session sess) { LOG.debug("onWebSocketConnect({})",sess); - sess.close(); + sess.close(StatusCode.NORMAL,"FastCloseServer"); } } @@ -129,14 +129,10 @@ public class WebSocketCloseTest public void onWebSocketConnect(Session sess) { LOG.debug("onWebSocketConnect({})",sess); + // Test failure due to unhandled exception + // this should trigger a fast-fail closure during open/connect throw new RuntimeException("Intentional FastFail"); } - - @Override - public void onWebSocketError(Throwable cause) - { - errors.add(cause); - } } private static final Logger LOG = Log.getLogger(WebSocketCloseTest.class); @@ -163,30 +159,28 @@ public class WebSocketCloseTest @Test public void testFastClose() throws Exception { - BlockheadClient client = new BlockheadClient(server.getServerUri()); - client.setProtocols("fastclose"); - client.setTimeout(TimeUnit.SECONDS,1); - try + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) { + client.setProtocols("fastclose"); + client.setTimeout(TimeUnit.SECONDS,1); client.connect(); client.sendStandardRequest(); client.expectUpgradeResponse(); + // Verify that client got close frame IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1); WebSocketFrame frame = capture.getFrames().poll(); Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE)); CloseInfo close = new CloseInfo(frame); Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL)); - + + // Notify server of close handshake client.write(close.asFrame()); // respond with close - + + // ensure server socket got close event Assert.assertThat("Fast Close Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true)); Assert.assertThat("Fast Close.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL)); } - finally - { - client.close(); - } } /** @@ -195,11 +189,10 @@ public class WebSocketCloseTest @Test public void testFastFail() throws Exception { - BlockheadClient client = new BlockheadClient(server.getServerUri()); - client.setProtocols("fastfail"); - client.setTimeout(TimeUnit.SECONDS,1); - try + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) { + client.setProtocols("fastfail"); + client.setTimeout(TimeUnit.SECONDS,1); try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class)) { client.connect(); @@ -214,14 +207,11 @@ public class WebSocketCloseTest client.write(close.asFrame()); // respond with close + // ensure server socket got close event Assert.assertThat("Fast Fail Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true)); Assert.assertThat("Fast Fail.statusCode",closeSocket.closeStatusCode,is(StatusCode.SERVER_ERROR)); Assert.assertThat("Fast Fail.errors",closeSocket.errors.size(),is(1)); } } - finally - { - client.close(); - } } }