From 7fec51ad408382720fd32ee6d5be68334ae020bc Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 22 Jan 2019 15:41:46 +1100 Subject: [PATCH] Issue #2175 cleanups after review Improve ws error handling by splitting processError into handling for errors from the network and errors from the application. Signed-off-by: Greg Wilkins --- .../misbehaving/MisbehavingClassTest.java | 15 ++-- .../core/internal/WebSocketChannel.java | 87 +++++++++++++++---- .../core/internal/WebSocketChannelState.java | 17 +++- 3 files changed, 93 insertions(+), 26 deletions(-) diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java index a2e57690c41..6fec5772549 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java @@ -18,6 +18,13 @@ package org.eclipse.jetty.websocket.javax.tests.client.misbehaving; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import javax.websocket.ContainerProvider; +import javax.websocket.WebSocketContainer; + import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.websocket.core.internal.WebSocketChannel; import org.eclipse.jetty.websocket.javax.tests.CoreServer; @@ -25,12 +32,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import javax.websocket.ContainerProvider; -import javax.websocket.WebSocketContainer; -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -45,6 +46,7 @@ public class MisbehavingClassTest @BeforeAll public static void startServer() throws Exception { + System.err.println("START"); server = new CoreServer(new CoreServer.EchoNegotiator()); // Start Server server.start(); @@ -53,6 +55,7 @@ public class MisbehavingClassTest @AfterAll public static void stopServer() { + System.err.println("STOP"); try { server.stop(); diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index d2a53f95cc9..a82f4e0bd9d 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -318,11 +318,13 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } } - private CloseStatus closeStatusFor(Throwable cause) + AbnormalCloseStatus closeStatusFor(Throwable cause) { int code; if (cause instanceof ProtocolException) code = CloseStatus.PROTOCOL; + else if (cause instanceof CloseException) + code = ((CloseException)cause).getStatusCode(); else if (cause instanceof Utf8Appendable.NotUtf8Exception) code = CloseStatus.BAD_PAYLOAD; else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException) @@ -332,7 +334,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio else code = CloseStatus.SERVER_ERROR; - return new CloseStatus(code, cause.getMessage()); + return new AbnormalCloseStatus(code, cause.getMessage()); } /** @@ -344,12 +346,19 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio { CloseStatus closeStatus = closeStatusFor(cause); - Callback callback = Callback.from(()->{onClosed(cause, closeStatus);connection.close();}); - if (closeStatus.getCode() == CloseStatus.PROTOCOL) - close(closeStatus, callback, false); + close(closeStatus, Callback.NOOP, false); else - callback.succeeded(); + { + try + { + onClosed(cause, closeStatus); + } + finally + { + connection.close(); + } + } } /** @@ -360,7 +369,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio public void processHandlerError(Throwable cause) { CloseStatus closeStatus = closeStatusFor(cause); - close(closeStatus, Callback.from(()->onClosed(cause, closeStatus)), false); + close(closeStatus, Callback.NOOP, false); } /** @@ -391,6 +400,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio catch (Throwable t) { LOG.warn("Error during OPEN", t); + try + { + handler.onError(t); + } + catch (Exception e) + { + t.addSuppressed(e); + } processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, t)); } } @@ -437,11 +454,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio if (LOG.isDebugEnabled()) LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); - boolean closeConnection; try { assertValidOutgoing(frame); - closeConnection = channelState.onOutgoingFrame(frame); } catch (Throwable ex) { @@ -449,11 +464,42 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio return; } + boolean closeConnection; + try + { + closeConnection = channelState.onOutgoingFrame(frame); + } + catch (Throwable ex) + { + try + { + callback.failed(ex); + } + finally + { + if (frame.getOpCode() == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof AbnormalCloseStatus) + { + try + { + handler.onClosed(CloseStatus.getCloseStatus(frame)); + } + finally + { + connection.close(); + } + } + } + return; + } + + if (frame.getOpCode() == OpCode.CLOSE) { if (LOG.isDebugEnabled()) LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch); + System.err.println(behavior + " Closing " + closeConnection); + if (closeConnection) { callback = new Callback.Nested(callback) @@ -461,24 +507,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio @Override public void completed() { + System.err.println(behavior + " completed " + closeConnection); try { handler.onClosed(channelState.getCloseStatus()); } catch (Throwable e) { - try - { - handler.onError(e); - } - catch (Throwable e2) - { - e.addSuppressed(e2); - LOG.warn(e); - } + LOG.warn(e); } finally { + System.err.println(behavior + " connection.close "); connection.close(); } } @@ -713,8 +753,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio @Override public String toString() { - return String.format("WSChannel@%x{%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s", + return String.format("WSChannel@%x{%s,%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s", hashCode(), + behavior, channelState, negotiated, autoFragment, @@ -723,4 +764,12 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio maxFrameSize, handler); } + + static class AbnormalCloseStatus extends CloseStatus + { + public AbnormalCloseStatus(int statusCode, String reasonPhrase) + { + super(statusCode, reasonPhrase); + } + } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java index 7b88e30dcc7..815959e940a 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java @@ -68,7 +68,11 @@ public class WebSocketChannelState @Override public String toString() { - return _channelState.toString(); + return String.format("%s@%x{%s,i=%s,o=%s,c=%s}",getClass().getSimpleName(),hashCode(), + _channelState, + OpCode.name(_incomingContinuation), + OpCode.name(_outgoingContinuation), + _closeStatus); } @@ -126,20 +130,31 @@ public class WebSocketChannelState synchronized (this) { if (!isOutputOpen()) + { + if (opcode == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof WebSocketChannel.AbnormalCloseStatus) + _channelState = State.CLOSED; throw new IllegalStateException(_channelState.toString()); + } if (opcode == OpCode.CLOSE) { _closeStatus = CloseStatus.getCloseStatus(frame); + if (_closeStatus instanceof WebSocketChannel.AbnormalCloseStatus) + { + _channelState = State.CLOSED; + return true; + } switch (_channelState) { case OPEN: _channelState = State.OSHUT; return false; + case ISHUT: _channelState = State.CLOSED; return true; + default: throw new IllegalStateException(_channelState.toString()); }