diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 70e27ed6876..d1cda54bdef 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -175,7 +175,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint close(null); } - protected final void close(Throwable failure) + public final void close(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("close({}) {}",failure,this); diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java index a0c435d1d33..dc49b31f1dd 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/CloseStatus.java @@ -18,14 +18,14 @@ package org.eclipse.jetty.websocket.core; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Utf8Appendable; -import org.eclipse.jetty.util.Utf8StringBuilder; - import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Utf8Appendable; +import org.eclipse.jetty.util.Utf8StringBuilder; + /** * Representation of a WebSocket Close (status code & reason) */ @@ -45,6 +45,7 @@ public class CloseStatus public static final int FAILED_TLS_HANDSHAKE = 1015; public static final CloseStatus NO_CODE_STATUS = new CloseStatus(NO_CODE); + public static final CloseStatus NO_CLOSE_STATUS = new CloseStatus(NO_CLOSE); public static final CloseStatus NORMAL_STATUS = new CloseStatus(NORMAL); static final int MAX_REASON_PHRASE = Frame.MAX_CONTROL_PAYLOAD - 2; @@ -171,6 +172,21 @@ public class CloseStatus return null; } + // TODO consider defining a precedence for every CloseStatus, and change ChannelState only if higher precedence + public static boolean isOrdinary(CloseStatus closeStatus) + { + switch (closeStatus.getCode()) + { + case NORMAL: + case SHUTDOWN: + case NO_CODE: + return true; + + default: + return false; + } + } + public int getCode() { return code; diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index 211c50c9668..ad4da0bc9ee 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -18,14 +18,15 @@ package org.eclipse.jetty.websocket.core.internal; -import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Objects; +import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; @@ -50,6 +51,7 @@ public class FrameFlusher extends IteratingCallback private final List entries; private final List buffers; private ByteBuffer batchBuffer = null; + private Throwable closedCause; public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) { @@ -62,29 +64,48 @@ public class FrameFlusher extends IteratingCallback this.buffers = new ArrayList<>((maxGather * 2) + 1); } - public void enqueue(Frame frame, Callback callback, boolean batch) + + /** + * Enqueue a Frame to be written to the endpoint. + * @param frame The frame to queue + * @param callback The callback to call once the frame is sent + * @param batch True if batch mode is to be used + * @return returns true if the frame was enqueued and iterate needs to be called, returns false if the + * FrameFlusher was closed + */ + public boolean enqueue(Frame frame, Callback callback, boolean batch) { Entry entry = new Entry(frame, callback, batch); byte opCode = frame.getOpCode(); + Throwable failure = null; + synchronized (this) { - if (opCode == OpCode.PING || opCode == OpCode.PONG) + if (closedCause != null) + failure = closedCause; + else if (opCode == OpCode.PING || opCode == OpCode.PONG) queue.offerFirst(entry); else queue.offerLast(entry); } + + if (failure != null) + callback.failed(failure); + + return failure==null; } - public void onClose() + public void onClose(Throwable t) { - Throwable cause = null; + if (t == null) + t = new ClosedChannelException(); + synchronized (this) { - if (!queue.isEmpty()) - cause = new IOException("Closed"); + closedCause = t; } - if (cause!=null) - onCompleteFailure(cause); + + iterate(); } @Override @@ -96,6 +117,9 @@ public class FrameFlusher extends IteratingCallback boolean flush = false; synchronized (this) { + if (closedCause != null) + throw closedCause; + // Succeed entries from previous call to process // and clear batchBuffer if we wrote it. if (succeedEntries() && batchBuffer != null) @@ -222,11 +246,17 @@ public class FrameFlusher extends IteratingCallback @Override public void onCompleteFailure(Throwable failure) { + BufferUtil.clear(batchBuffer); releaseAggregate(); synchronized (this) { entries.addAll(queue); queue.clear(); + + if (closedCause == null) + closedCause = failure; + else if (closedCause != failure) + closedCause.addSuppressed(failure); } for (Entry entry : entries) @@ -235,6 +265,10 @@ public class FrameFlusher extends IteratingCallback entry.release(); } entries.clear(); + if (endPoint instanceof AbstractEndPoint) + ((AbstractEndPoint)endPoint).close(failure); + else + endPoint.close(); } private void releaseAggregate() diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 6e615b329dd..2002d82dcae 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.URI; +import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.ArrayDeque; import java.util.List; @@ -291,11 +292,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio return this.connection.getBufferPool(); } - public void onClosed(Throwable cause) + public void onEof() { - CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString()); - if (channelState.onClosed(closeStatus)) - closeConnection(cause, closeStatus, NOOP); + if (channelState.onEof()) + closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP); } public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback) @@ -527,7 +527,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio if (frame.getOpCode() == OpCode.CLOSE) { CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); - if (closeStatus instanceof AbnormalCloseStatus) + if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus)) closeConnection(null, closeStatus, Callback.from( ()->callback.failed(ex), x2-> diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java index eceec214f69..821f5c289fc 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java @@ -122,6 +122,25 @@ public class WebSocketChannelState } } + public boolean onEof() + { + synchronized (this) + { + switch (_channelState) + { + case CLOSED: + case ISHUT: + return false; + + default: + if (_closeStatus == null || CloseStatus.isOrdinary(_closeStatus)) + _closeStatus = CloseStatus.NO_CLOSE_STATUS; + _channelState = State.CLOSED; + return true; + } + } + } + public boolean onOutgoingFrame(Frame frame) throws ProtocolException { byte opcode = frame.getOpCode(); @@ -130,11 +149,7 @@ public class WebSocketChannelState synchronized (this) { if (!isOutputOpen()) - { - if (opcode == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof WebSocketChannel.AbnormalCloseStatus) - _channelState = State.CLOSED; throw new IllegalStateException(_channelState.toString()); - } if (opcode == OpCode.CLOSE) { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index cc393c31e1a..1879e386dad 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.core.internal; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.Objects; import java.util.Random; import java.util.concurrent.Executor; @@ -167,12 +168,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio if (LOG.isDebugEnabled()) LOG.debug("onClose() of physical connection"); + Throwable t = new ClosedChannelException(); + if (!channel.isClosed()) - { - IOException e = new IOException("Closed"); - channel.onClosed(e); - } - flusher.onClose(); + channel.onEof(); + + flusher.onClose(t); super.onClose(); } @@ -347,12 +348,10 @@ public class WebSocketConnection extends AbstractConnection implements Connectio if (!fillingAndParsing) throw new IllegalStateException(); - if (demand > 0) + if (demand != 0) //if demand was canceled, this creates synthetic demand in order to read until EOF return true; - if (demand == 0) - fillingAndParsing = false; - + fillingAndParsing = false; if (networkBuffer.isEmpty()) releaseNetworkBuffer(); @@ -372,10 +371,9 @@ public class WebSocketConnection extends AbstractConnection implements Connectio if (!fillingAndParsing) throw new IllegalStateException(); - if (demand < 0) - return false; + if (demand > 0) + demand--; - demand--; return true; } } @@ -410,9 +408,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio onFrame(frame); if (!moreDemand()) - { return; - } } // buffer must be empty here because parser is fully consuming @@ -436,7 +432,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio if (filled < 0) { releaseNetworkBuffer(); - channel.onClosed(new IOException("Read EOF")); + channel.onEof(); return; } @@ -532,43 +528,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio generator); } - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - - EndPoint endp = getEndPoint(); - if (endp != null) - { - result = prime * result + endp.getLocalAddress().hashCode(); - result = prime * result + endp.getRemoteAddress().hashCode(); - } - return result; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - WebSocketConnection other = (WebSocketConnection)obj; - EndPoint endp = getEndPoint(); - EndPoint otherEndp = other.getEndPoint(); - if (endp == null) - { - if (otherEndp != null) - return false; - } - else if (!endp.equals(otherEndp)) - return false; - return true; - } - /** * Extra bytes from the initial HTTP upgrade that need to * be processed by the websocket parser before starting @@ -595,13 +554,13 @@ public class WebSocketConnection extends AbstractConnection implements Connectio { if (channel.getBehavior() == Behavior.CLIENT) { - Frame wsf = frame; byte[] mask = new byte[4]; random.nextBytes(mask); - wsf.setMask(mask); + frame.setMask(mask); } - flusher.enqueue(frame, callback, batch); - flusher.iterate(); + + if (flusher.enqueue(frame, callback, batch)) + flusher.iterate(); } private class Flusher extends FrameFlusher @@ -615,7 +574,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio public void onCompleteFailure(Throwable x) { super.onCompleteFailure(x); - channel.processConnectionError(x,NOOP); + channel.processConnectionError(x, NOOP); } } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java index ca72b60db6b..bf846d598df 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java @@ -118,7 +118,6 @@ public class WebSocketCloseTest extends WebSocketTester server.handler.getCoreSession().demand(1); client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); Frame frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS); - assertNotNull(frame); assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL)); assertThat(server.handler.getCoreSession().toString(), containsString("ISHUT")); @@ -143,9 +142,8 @@ public class WebSocketCloseTest extends WebSocketTester } server.sendFrame(CloseStatus.toFrame(CloseStatus.NORMAL)); - Frame frame = receiveFrame(client.getInputStream()); - assertNotNull(frame); - assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL)); + CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream())); + assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL)); assertThat(server.handler.getCoreSession().toString(), containsString("OSHUT")); LOG.info("Server: OSHUT"); @@ -162,7 +160,6 @@ public class WebSocketCloseTest extends WebSocketTester server.handler.receivedCallback.poll().succeeded(); Frame frame = receiveFrame(client.getInputStream()); - assertNotNull(frame); assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL)); assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS)); @@ -177,7 +174,6 @@ public class WebSocketCloseTest extends WebSocketTester server.sendFrame(CloseStatus.toFrame(CloseStatus.SHUTDOWN)); server.handler.receivedCallback.poll().succeeded(); Frame frame = receiveFrame(client.getInputStream()); - assertNotNull(frame); assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SHUTDOWN)); assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS)); @@ -190,14 +186,27 @@ public class WebSocketCloseTest extends WebSocketTester setup(State.ISHUT); server.handler.receivedCallback.poll().failed(new Exception("test failure")); - Frame frame = receiveFrame(client.getInputStream()); - assertNotNull(frame); - assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SERVER_ERROR)); + CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream())); + assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); + assertThat(closeStatus.getReason(), is("test failure")); assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); } + @Test + public void clientClosesOutput_ISHUT() throws Exception + { + setup(State.ISHUT); + + client.shutdownOutput(); + assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); + server.handler.receivedCallback.poll().succeeded(); + + CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream())); + assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL)); + } + @Test public void clientClose_OSHUT() throws Exception { @@ -276,11 +285,57 @@ public class WebSocketCloseTest extends WebSocketTester setup(State.ISHUT); client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false)); - assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); + assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); + assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.PROTOCOL)); - server.close(); + Frame frame = receiveFrame(client.getInputStream()); + assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.PROTOCOL)); + receiveEof(client.getInputStream()); + } + + @Test + public void clientHalfClose_ISHUT() throws Exception + { + setup(State.ISHUT); + + client.shutdownOutput(); + assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); + Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS); + + callback.succeeded(); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL)); + + Frame frame = receiveFrame(client.getInputStream()); + assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL)); + receiveEof(client.getInputStream()); + } + + @Test + public void clientCloseServerWrite_ISHUT() throws Exception + { + setup(State.ISHUT); + + client.close(); + assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS)); + + while(true) + { + if (!server.isOpen()) + break; + + Callback callback = Callback.from(()->System.err.println("Succeeded Frame After Close"), + (t)->System.err.println("Failed Frame After Close")); + server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), callback); + } + + assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); + assertNotNull(server.handler.error); + assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); + + Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS); + callback.succeeded(); + assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); } @Test @@ -293,7 +348,6 @@ public class WebSocketCloseTest extends WebSocketTester server.handler.getCoreSession().demand(1); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); - assertThat(server.handler.closeStatus.getReason(), containsString("IOException")); } @Test @@ -306,7 +360,6 @@ public class WebSocketCloseTest extends WebSocketTester server.handler.getCoreSession().demand(1); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); - assertThat(server.handler.closeStatus.getReason(), containsString("IOException")); } @Test @@ -362,6 +415,7 @@ public class WebSocketCloseTest extends WebSocketTester protected BlockingQueue receivedFrames = new BlockingArrayQueue<>(); protected BlockingQueue receivedCallback = new BlockingArrayQueue<>(); + protected volatile Throwable error = null; protected CountDownLatch opened = new CountDownLatch(1); protected CountDownLatch closed = new CountDownLatch(1); protected CloseStatus closeStatus = null; @@ -410,6 +464,7 @@ public class WebSocketCloseTest extends WebSocketTester public void onError(Throwable cause) { LOG.info("onError {} ", cause == null?null:cause.toString()); + error = cause; state = session.toString(); } @@ -477,6 +532,11 @@ public class WebSocketCloseTest extends WebSocketTester handler.getCoreSession().sendFrame(frame, NOOP, false); } + public void sendFrame(Frame frame, Callback callback) + { + handler.getCoreSession().sendFrame(frame, callback, false); + } + public void sendText(String line) { LOG.info("sending {}...", line); diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketTester.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketTester.java index 04279d4724a..1ecb8c99e3b 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketTester.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketTester.java @@ -18,6 +18,13 @@ package org.eclipse.jetty.websocket.core; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.ArrayByteBufferPool; @@ -27,13 +34,6 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.core.internal.Parser; import org.junit.jupiter.api.BeforeEach; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.startsWith; @@ -124,4 +124,18 @@ public class WebSocketTester return frame; } } + + protected void receiveEof(InputStream in) throws IOException + { + ByteBuffer buffer = bufferPool.acquire(4096, false); + while (true) + { + BufferUtil.flipToFill(buffer); + int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + if (len < 0) + return; + + throw new IllegalStateException("unexpected content"); + } + } }