From ff1f3ca3be9df971c7f55eb636e5b46ac904ce39 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 7 Feb 2019 09:58:08 +1100 Subject: [PATCH] Issue #3290 - WebSocket read and writer error handling when the WSConnection reads EOF it now notifies the WSChannel the channel instead of handling it locally fixed FlusherFlusher failure issues fixed issue with the WebSocketCloseTest expecting close reason Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/io/AbstractEndPoint.java | 2 +- .../jetty/websocket/core/CloseStatus.java | 9 ++-- .../websocket/core/internal/FrameFlusher.java | 43 +++++++++++++++---- .../core/internal/WebSocketChannel.java | 8 ++-- .../core/internal/WebSocketChannelState.java | 19 ++++++++ .../core/internal/WebSocketConnection.java | 24 +++++------ .../websocket/core/WebSocketCloseTest.java | 2 - 7 files changed, 75 insertions(+), 32 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 70e27ed6876..d1cda54bdef 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -175,7 +175,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint close(null); } - protected final void close(Throwable failure) + public final void close(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("close({}) {}",failure,this); diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java index a0c435d1d33..84bb316dbf4 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java @@ -18,14 +18,14 @@ package org.eclipse.jetty.websocket.core; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Utf8Appendable; -import org.eclipse.jetty.util.Utf8StringBuilder; - import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Utf8Appendable; +import org.eclipse.jetty.util.Utf8StringBuilder; + /** * Representation of a WebSocket Close (status code & reason) */ @@ -45,6 +45,7 @@ public class CloseStatus public static final int FAILED_TLS_HANDSHAKE = 1015; public static final CloseStatus NO_CODE_STATUS = new CloseStatus(NO_CODE); + public static final CloseStatus NO_CLOSE_STATUS = new CloseStatus(NO_CLOSE); public static final CloseStatus NORMAL_STATUS = new CloseStatus(NORMAL); static final int MAX_REASON_PHRASE = Frame.MAX_CONTROL_PAYLOAD - 2; diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index 211c50c9668..2a962087a86 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -18,14 +18,15 @@ package org.eclipse.jetty.websocket.core.internal; -import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Objects; +import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; @@ -50,6 +51,7 @@ public class FrameFlusher extends IteratingCallback private final List entries; private final List buffers; private ByteBuffer batchBuffer = null; + private Throwable closedCause; public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) { @@ -62,29 +64,39 @@ public class FrameFlusher extends IteratingCallback this.buffers = new ArrayList<>((maxGather * 2) + 1); } - public void enqueue(Frame frame, Callback callback, boolean batch) + public boolean enqueue(Frame frame, Callback callback, boolean batch) { Entry entry = new Entry(frame, callback, batch); byte opCode = frame.getOpCode(); + Throwable failure = null; + synchronized (this) { - if (opCode == OpCode.PING || opCode == OpCode.PONG) + if (closedCause != null) + failure = closedCause; + else if (opCode == OpCode.PING || opCode == OpCode.PONG) queue.offerFirst(entry); else queue.offerLast(entry); } + + if (failure != null) + callback.failed(failure); + + return failure==null; } - public void onClose() + public void onClose(Throwable t) { - Throwable cause = null; + if (t == null) + t = new ClosedChannelException(); + synchronized (this) { - if (!queue.isEmpty()) - cause = new IOException("Closed"); + closedCause = t; } - if (cause!=null) - onCompleteFailure(cause); + + iterate(); } @Override @@ -96,6 +108,9 @@ public class FrameFlusher extends IteratingCallback boolean flush = false; synchronized (this) { + if (closedCause != null) + throw closedCause; + // Succeed entries from previous call to process // and clear batchBuffer if we wrote it. if (succeedEntries() && batchBuffer != null) @@ -222,11 +237,17 @@ public class FrameFlusher extends IteratingCallback @Override public void onCompleteFailure(Throwable failure) { + BufferUtil.clear(batchBuffer); releaseAggregate(); synchronized (this) { entries.addAll(queue); queue.clear(); + + if (closedCause == null) + closedCause = failure; + else if (closedCause != failure) + closedCause.addSuppressed(failure); } for (Entry entry : entries) @@ -235,6 +256,10 @@ public class FrameFlusher extends IteratingCallback entry.release(); } entries.clear(); + if (endPoint instanceof AbstractEndPoint) + ((AbstractEndPoint)endPoint).close(failure); + else + endPoint.close(); } private void releaseAggregate() diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 6e615b329dd..4f32c503a25 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.URI; +import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.ArrayDeque; import java.util.List; @@ -291,11 +292,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio return this.connection.getBufferPool(); } - public void onClosed(Throwable cause) + public void onEof() { - CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString()); - if (channelState.onClosed(closeStatus)) - closeConnection(cause, closeStatus, NOOP); + if (channelState.onEof()) + closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP); } public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java index eceec214f69..05be2dadb3d 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java @@ -122,6 +122,25 @@ public class WebSocketChannelState } } + public boolean onEof() + { + synchronized (this) + { + switch (_channelState) + { + case CLOSED: + case ISHUT: + return false; + + default: + if (_closeStatus == null || CloseStatus.isOrdinary(_closeStatus)) + _closeStatus = CloseStatus.NO_CLOSE_STATUS; + _channelState = State.CLOSED; + return true; + } + } + } + public boolean onOutgoingFrame(Frame frame) throws ProtocolException { byte opcode = frame.getOpCode(); diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index cc393c31e1a..403197ea5ec 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.core.internal; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.Objects; import java.util.Random; import java.util.concurrent.Executor; @@ -167,12 +168,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio if (LOG.isDebugEnabled()) LOG.debug("onClose() of physical connection"); + Throwable t = new ClosedChannelException(); + if (!channel.isClosed()) - { - IOException e = new IOException("Closed"); - channel.onClosed(e); - } - flusher.onClose(); + channel.onEof(); + + flusher.onClose(t); super.onClose(); } @@ -410,7 +411,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio onFrame(frame); if (!moreDemand()) - { return; } } @@ -436,7 +436,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio if (filled < 0) { releaseNetworkBuffer(); - channel.onClosed(new IOException("Read EOF")); + channel.onEof(); return; } @@ -595,13 +595,13 @@ public class WebSocketConnection extends AbstractConnection implements Connectio { if (channel.getBehavior() == Behavior.CLIENT) { - Frame wsf = frame; byte[] mask = new byte[4]; random.nextBytes(mask); - wsf.setMask(mask); + frame.setMask(mask); } - flusher.enqueue(frame, callback, batch); - flusher.iterate(); + + if (flusher.enqueue(frame, callback, batch)) + flusher.iterate(); } private class Flusher extends FrameFlusher @@ -615,7 +615,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio public void onCompleteFailure(Throwable x) { super.onCompleteFailure(x); - channel.processConnectionError(x,NOOP); + channel.processConnectionError(x, NOOP); } } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java index ca72b60db6b..f9855b6574b 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java @@ -293,7 +293,6 @@ public class WebSocketCloseTest extends WebSocketTester server.handler.getCoreSession().demand(1); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); - assertThat(server.handler.closeStatus.getReason(), containsString("IOException")); } @Test @@ -306,7 +305,6 @@ public class WebSocketCloseTest extends WebSocketTester server.handler.getCoreSession().demand(1); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); - assertThat(server.handler.closeStatus.getReason(), containsString("IOException")); } @Test