diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketStatsTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketStatsTest.java index 795120cbd29..226ff8f2e01 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketStatsTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketStatsTest.java @@ -179,7 +179,7 @@ public class WebSocketStatsTest ClientSocket socket = new ClientSocket(); CompletableFuture connect = client.connect(socket, uri); - final long numMessages = 10000; + final long numMessages = 1000; final String msgText = "hello world"; long upgradeSentBytes; diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java index 395afc771eb..c7a231017ab 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java @@ -328,25 +328,33 @@ public class ClientCloseTest assertThat(serverEndpoints.size(), is(sessionCount)); - // block all the server threads - for (int i = 0; i < sessionCount; i++) - clientSockets.get(i).getSession().getRemote().sendString("block"); + try + { + // block all the server threads + for (int i = 0; i < sessionCount; i++) + clientSockets.get(i).getSession().getRemote().sendString("block"); - assertTimeoutPreemptively(ofSeconds(5), () -> { - // client lifecycle stop (the meat of this test) - client.stop(); - }); + assertTimeoutPreemptively(ofSeconds(5), () -> + { + // client lifecycle stop (the meat of this test) + client.stop(); + }); - // clients disconnect - for (int i = 0; i < sessionCount; i++) - clientSockets.get(i).assertReceivedCloseEvent(2000, is(StatusCode.ABNORMAL), containsString("Channel Closed")); + // clients disconnect + for (int i = 0; i < sessionCount; i++) + clientSockets.get(i).assertReceivedCloseEvent(2000, is(StatusCode.ABNORMAL), containsString("Channel Closed")); - // ensure all Sessions are gone. connections are gone. etc. (client and server) - // ensure ConnectionListener onClose is called 3 times - clientSessionTracker.assertClosedProperly(client); + // ensure all Sessions are gone. connections are gone. etc. (client and server) + // ensure ConnectionListener onClose is called 3 times + clientSessionTracker.assertClosedProperly(client); - for (int i = 0; i < sessionCount; i++) - serverEndpoints.get(i).block.countDown(); + assertThat(serverEndpoints.size(), is(sessionCount)); + } + finally + { + for (int i = 0; i < sessionCount; i++) + serverEndpoints.get(i).block.countDown(); + } } @Test @@ -367,28 +375,35 @@ public class ClientCloseTest // client confirms connection via echo confirmConnection(clientSocket, clientConnectFuture); - // Block on the server so that the server does not detect a read failure - clientSocket.getSession().getRemote().sendString("block"); + try + { + // Block on the server so that the server does not detect a read failure + clientSocket.getSession().getRemote().sendString("block"); - // setup client endpoint for write failure (test only) - EndPoint endp = clientSocket.getEndPoint(); - endp.shutdownOutput(); + // setup client endpoint for write failure (test only) + EndPoint endp = clientSocket.getEndPoint(); + endp.shutdownOutput(); - // client enqueue close frame - // should result in a client write failure - final String origCloseReason = "Normal Close from Client"; - clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + // client enqueue close frame + // should result in a client write failure + final String origCloseReason = "Normal Close from Client"; + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); - assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true)); - assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class)); + assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true)); + assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class)); - // client triggers close event on client ws-endpoint - // assert - close code==1006 (abnormal) - clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), null); - clientSessionTracker.assertClosedProperly(client); + // client triggers close event on client ws-endpoint + // assert - close code==1006 (abnormal) + clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), null); + clientSessionTracker.assertClosedProperly(client); - assertThat(serverEndpoints.size(), is(1)); - serverEndpoints.get(0).block.countDown(); + assertThat(serverEndpoints.size(), is(1)); + } + finally + { + for (ServerEndpoint endpoint : serverEndpoints) + endpoint.block.countDown(); + } } public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 4a34ebe776d..335da69d1cb 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -32,7 +32,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.Utf8Appendable; @@ -363,29 +362,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio callback.failed(e); } } - - - } - - AbnormalCloseStatus abnormalCloseStatusFor(Throwable cause) - { - int code; - if (cause instanceof ProtocolException) - code = CloseStatus.PROTOCOL; - else if (cause instanceof CloseException) - code = ((CloseException)cause).getStatusCode(); - else if (cause instanceof Utf8Appendable.NotUtf8Exception) - code = CloseStatus.BAD_PAYLOAD; - else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException) - code = CloseStatus.SHUTDOWN; - else if (cause instanceof EofException) - code = CloseStatus.NO_CLOSE; - else if (behavior == Behavior.CLIENT) - code = CloseStatus.POLICY_VIOLATION; - else - code = CloseStatus.SERVER_ERROR; - - return new AbnormalCloseStatus(code, cause); } /** @@ -401,14 +377,24 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio if (LOG.isDebugEnabled()) LOG.debug("processConnectionError {} {}", this, cause); - CloseStatus closeStatus = abnormalCloseStatusFor(cause); - - if (closeStatus.getCode() == CloseStatus.PROTOCOL) - close(closeStatus, callback); - else if (channelState.onClosed(closeStatus)) - closeConnection(cause, closeStatus, callback); + int code; + if (cause instanceof CloseException) + code = ((CloseException)cause).getStatusCode(); + else if (cause instanceof Utf8Appendable.NotUtf8Exception) + code = CloseStatus.BAD_PAYLOAD; + else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException) + code = CloseStatus.SHUTDOWN; else - callback.failed(cause); + code = CloseStatus.NO_CLOSE; + + AbnormalCloseStatus closeStatus = new AbnormalCloseStatus(code, cause); + if (CloseStatus.isTransmittableStatusCode(code)) + close(closeStatus, callback); + else + { + if (channelState.onClosed(closeStatus)) + closeConnection(cause, closeStatus, callback); + } } /** @@ -423,7 +409,19 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio if (LOG.isDebugEnabled()) LOG.debug("processHandlerError {} {}", this, cause); - close(abnormalCloseStatusFor(cause), callback); + int code; + if (cause instanceof CloseException) + code = ((CloseException)cause).getStatusCode(); + else if (cause instanceof Utf8Appendable.NotUtf8Exception) + code = CloseStatus.BAD_PAYLOAD; + else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException) + code = CloseStatus.SHUTDOWN; + else if (behavior == Behavior.CLIENT) + code = CloseStatus.POLICY_VIOLATION; + else + code = CloseStatus.SERVER_ERROR; + + close(new AbnormalCloseStatus(code, cause), callback); } /** @@ -515,6 +513,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } catch (Throwable t) { + if (LOG.isDebugEnabled()) + LOG.warn("Invalid outgoing frame: {}", frame); + callback.failed(t); return; } @@ -523,10 +524,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio { synchronized(flusher) { - boolean closeConnection = channelState.onOutgoingFrame(frame); if (LOG.isDebugEnabled()) - LOG.debug("sendFrame({}, {}, {}) {}", frame, callback, batch, closeConnection); + LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); + boolean closeConnection = channelState.onOutgoingFrame(frame); if (closeConnection) { Throwable cause = AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame)); @@ -546,6 +547,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } catch (Throwable t) { + if (LOG.isDebugEnabled()) + LOG.debug("Failed sendFrame()", t); + if (frame.getOpCode() == OpCode.CLOSE) { CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java index 827b1d3a192..3cabfc338c9 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.websocket.core; import java.net.Socket; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -52,6 +53,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -315,21 +317,22 @@ public class WebSocketCloseTest extends WebSocketTester client.close(); assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); - while(true) - { - if (!server.isOpen()) - break; - - server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP); - } + assertTimeoutPreemptively(Duration.ofSeconds(1), ()->{ + while(true) + { + if (!server.isOpen()) + break; + server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), Callback.NOOP); + Thread.sleep(100); + } + }); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertNotNull(server.handler.error); - assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); + assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS); callback.succeeded(); - assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); } @ParameterizedTest @@ -433,7 +436,7 @@ public class WebSocketCloseTest extends WebSocketTester @Override public void onOpen(CoreSession coreSession) { - LOG.info("onOpen {}", coreSession); + LOG.debug("onOpen {}", coreSession); session = coreSession; state = session.toString(); opened.countDown(); @@ -442,7 +445,7 @@ public class WebSocketCloseTest extends WebSocketTester @Override public void onFrame(Frame frame, Callback callback) { - LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload())); + LOG.debug("onFrame: " + BufferUtil.toDetailString(frame.getPayload())); state = session.toString(); receivedCallback.offer(callback); receivedFrames.offer(Frame.copy(frame)); @@ -454,7 +457,7 @@ public class WebSocketCloseTest extends WebSocketTester @Override public void onClosed(CloseStatus closeStatus) { - LOG.info("onClosed {}", closeStatus); + LOG.debug("onClosed {}", closeStatus); state = session.toString(); this.closeStatus = closeStatus; closed.countDown(); @@ -463,7 +466,7 @@ public class WebSocketCloseTest extends WebSocketTester @Override public void onError(Throwable cause) { - LOG.info("onError {} ", cause == null?null:cause.toString()); + LOG.debug("onError {} ", cause); error = cause; state = session.toString(); }