Merge branch 'jetty-10.0.x-3290-websocket-close' of https://github.com/lachlan-roberts/jetty.project into jetty-10.0.x-3290-websocket-close

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-02-07 11:49:33 +11:00
commit 2a69fcc7ac
8 changed files with 187 additions and 100 deletions

View File

@ -175,7 +175,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
close(null);
}
protected final void close(Throwable failure)
public final void close(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("close({}) {}",failure,this);

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
/**
* Representation of a WebSocket Close (status code &amp; reason)
*/
@ -45,6 +45,7 @@ public class CloseStatus
public static final int FAILED_TLS_HANDSHAKE = 1015;
public static final CloseStatus NO_CODE_STATUS = new CloseStatus(NO_CODE);
public static final CloseStatus NO_CLOSE_STATUS = new CloseStatus(NO_CLOSE);
public static final CloseStatus NORMAL_STATUS = new CloseStatus(NORMAL);
static final int MAX_REASON_PHRASE = Frame.MAX_CONTROL_PAYLOAD - 2;
@ -171,6 +172,19 @@ public class CloseStatus
return null;
}
public static boolean isOrdinary(CloseStatus closeStatus)
{
switch (closeStatus.getCode())
{
case NORMAL:
case SHUTDOWN:
return true;
default:
return false;
}
}
public int getCode()
{
return code;

View File

@ -18,14 +18,15 @@
package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
@ -50,6 +51,7 @@ public class FrameFlusher extends IteratingCallback
private final List<Entry> entries;
private final List<ByteBuffer> buffers;
private ByteBuffer batchBuffer = null;
private Throwable closedCause;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
{
@ -62,29 +64,39 @@ public class FrameFlusher extends IteratingCallback
this.buffers = new ArrayList<>((maxGather * 2) + 1);
}
public void enqueue(Frame frame, Callback callback, boolean batch)
public boolean enqueue(Frame frame, Callback callback, boolean batch)
{
Entry entry = new Entry(frame, callback, batch);
byte opCode = frame.getOpCode();
Throwable failure = null;
synchronized (this)
{
if (opCode == OpCode.PING || opCode == OpCode.PONG)
if (closedCause != null)
failure = closedCause;
else if (opCode == OpCode.PING || opCode == OpCode.PONG)
queue.offerFirst(entry);
else
queue.offerLast(entry);
}
if (failure != null)
callback.failed(failure);
return failure==null;
}
public void onClose()
public void onClose(Throwable t)
{
Throwable cause = null;
if (t == null)
t = new ClosedChannelException();
synchronized (this)
{
if (!queue.isEmpty())
cause = new IOException("Closed");
closedCause = t;
}
if (cause!=null)
onCompleteFailure(cause);
iterate();
}
@Override
@ -96,6 +108,9 @@ public class FrameFlusher extends IteratingCallback
boolean flush = false;
synchronized (this)
{
if (closedCause != null)
throw closedCause;
// Succeed entries from previous call to process
// and clear batchBuffer if we wrote it.
if (succeedEntries() && batchBuffer != null)
@ -222,11 +237,17 @@ public class FrameFlusher extends IteratingCallback
@Override
public void onCompleteFailure(Throwable failure)
{
BufferUtil.clear(batchBuffer);
releaseAggregate();
synchronized (this)
{
entries.addAll(queue);
queue.clear();
if (closedCause == null)
closedCause = failure;
else if (closedCause != failure)
closedCause.addSuppressed(failure);
}
for (Entry entry : entries)
@ -235,6 +256,10 @@ public class FrameFlusher extends IteratingCallback
entry.release();
}
entries.clear();
if (endPoint instanceof AbstractEndPoint)
((AbstractEndPoint)endPoint).close(failure);
else
endPoint.close();
}
private void releaseAggregate()

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.List;
@ -291,11 +292,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
return this.connection.getBufferPool();
}
public void onClosed(Throwable cause)
public void onEof()
{
CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString());
if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, NOOP);
if (channelState.onEof())
closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP);
}
public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback)
@ -527,7 +527,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus)
if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus))
closeConnection(null, closeStatus, Callback.from(
()->callback.failed(ex),
x2->

View File

@ -122,6 +122,25 @@ public class WebSocketChannelState
}
}
public boolean onEof()
{
synchronized (this)
{
switch (_channelState)
{
case CLOSED:
case ISHUT:
return false;
default:
if (_closeStatus == null || CloseStatus.isOrdinary(_closeStatus))
_closeStatus = CloseStatus.NO_CLOSE_STATUS;
_channelState = State.CLOSED;
return true;
}
}
}
public boolean onOutgoingFrame(Frame frame) throws ProtocolException
{
byte opcode = frame.getOpCode();
@ -130,11 +149,7 @@ public class WebSocketChannelState
synchronized (this)
{
if (!isOutputOpen())
{
if (opcode == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof WebSocketChannel.AbnormalCloseStatus)
_channelState = State.CLOSED;
throw new IllegalStateException(_channelState.toString());
}
if (opcode == OpCode.CLOSE)
{

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Executor;
@ -167,12 +168,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
if (LOG.isDebugEnabled())
LOG.debug("onClose() of physical connection");
Throwable t = new ClosedChannelException();
if (!channel.isClosed())
{
IOException e = new IOException("Closed");
channel.onClosed(e);
}
flusher.onClose();
channel.onEof();
flusher.onClose(t);
super.onClose();
}
@ -347,12 +348,10 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
if (!fillingAndParsing)
throw new IllegalStateException();
if (demand > 0)
if (demand != 0)
return true;
if (demand == 0)
fillingAndParsing = false;
fillingAndParsing = false;
if (networkBuffer.isEmpty())
releaseNetworkBuffer();
@ -372,10 +371,9 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
if (!fillingAndParsing)
throw new IllegalStateException();
if (demand < 0)
return false;
if (demand > 0)
demand--;
demand--;
return true;
}
}
@ -410,9 +408,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
onFrame(frame);
if (!moreDemand())
{
return;
}
}
// buffer must be empty here because parser is fully consuming
@ -436,7 +432,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
if (filled < 0)
{
releaseNetworkBuffer();
channel.onClosed(new IOException("Read EOF"));
channel.onEof();
return;
}
@ -532,43 +528,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
generator);
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
EndPoint endp = getEndPoint();
if (endp != null)
{
result = prime * result + endp.getLocalAddress().hashCode();
result = prime * result + endp.getRemoteAddress().hashCode();
}
return result;
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
WebSocketConnection other = (WebSocketConnection)obj;
EndPoint endp = getEndPoint();
EndPoint otherEndp = other.getEndPoint();
if (endp == null)
{
if (otherEndp != null)
return false;
}
else if (!endp.equals(otherEndp))
return false;
return true;
}
/**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting
@ -595,13 +554,13 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
{
if (channel.getBehavior() == Behavior.CLIENT)
{
Frame wsf = frame;
byte[] mask = new byte[4];
random.nextBytes(mask);
wsf.setMask(mask);
frame.setMask(mask);
}
flusher.enqueue(frame, callback, batch);
flusher.iterate();
if (flusher.enqueue(frame, callback, batch))
flusher.iterate();
}
private class Flusher extends FrameFlusher
@ -615,7 +574,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
public void onCompleteFailure(Throwable x)
{
super.onCompleteFailure(x);
channel.processConnectionError(x,NOOP);
channel.processConnectionError(x, NOOP);
}
}
}

View File

@ -118,7 +118,6 @@ public class WebSocketCloseTest extends WebSocketTester
server.handler.getCoreSession().demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
Frame frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.getCoreSession().toString(), containsString("ISHUT"));
@ -143,9 +142,8 @@ public class WebSocketCloseTest extends WebSocketTester
}
server.sendFrame(CloseStatus.toFrame(CloseStatus.NORMAL));
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
assertThat(server.handler.getCoreSession().toString(), containsString("OSHUT"));
LOG.info("Server: OSHUT");
@ -162,7 +160,6 @@ public class WebSocketCloseTest extends WebSocketTester
server.handler.receivedCallback.poll().succeeded();
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
@ -177,7 +174,6 @@ public class WebSocketCloseTest extends WebSocketTester
server.sendFrame(CloseStatus.toFrame(CloseStatus.SHUTDOWN));
server.handler.receivedCallback.poll().succeeded();
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SHUTDOWN));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
@ -190,14 +186,27 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.ISHUT);
server.handler.receivedCallback.poll().failed(new Exception("test failure"));
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SERVER_ERROR));
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("test failure"));
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
}
@Test
public void clientClosesOutput_ISHUT() throws Exception
{
setup(State.ISHUT);
client.shutdownOutput();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
server.handler.receivedCallback.poll().succeeded();
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
}
@Test
public void clientClose_OSHUT() throws Exception
{
@ -276,11 +285,57 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.ISHUT);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
server.close();
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.PROTOCOL));
receiveEof(client.getInputStream());
}
@Test
public void clientHalfClose_ISHUT() throws Exception
{
setup(State.ISHUT);
client.shutdownOutput();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
callback.succeeded();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
Frame frame = receiveFrame(client.getInputStream());
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
receiveEof(client.getInputStream());
}
@Test
public void clientCloseServerWrite_ISHUT() throws Exception
{
setup(State.ISHUT);
client.close();
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
while(true)
{
if (!server.isOpen())
break;
Callback callback = Callback.from(()->System.err.println("Succeeded Frame After Close"),
(t)->System.err.println("Failed Frame After Close"));
server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), callback);
}
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertNotNull(server.handler.error);
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
callback.succeeded();
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
}
@Test
@ -293,7 +348,6 @@ public class WebSocketCloseTest extends WebSocketTester
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
assertThat(server.handler.closeStatus.getReason(), containsString("IOException"));
}
@Test
@ -306,7 +360,6 @@ public class WebSocketCloseTest extends WebSocketTester
server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
assertThat(server.handler.closeStatus.getReason(), containsString("IOException"));
}
@Test
@ -362,6 +415,7 @@ public class WebSocketCloseTest extends WebSocketTester
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected BlockingQueue<Callback> receivedCallback = new BlockingArrayQueue<>();
protected volatile Throwable error = null;
protected CountDownLatch opened = new CountDownLatch(1);
protected CountDownLatch closed = new CountDownLatch(1);
protected CloseStatus closeStatus = null;
@ -410,6 +464,7 @@ public class WebSocketCloseTest extends WebSocketTester
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
error = cause;
state = session.toString();
}
@ -477,6 +532,11 @@ public class WebSocketCloseTest extends WebSocketTester
handler.getCoreSession().sendFrame(frame, NOOP, false);
}
public void sendFrame(Frame frame, Callback callback)
{
handler.getCoreSession().sendFrame(frame, callback, false);
}
public void sendText(String line)
{
LOG.info("sending {}...", line);

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.core;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ArrayByteBufferPool;
@ -27,13 +34,6 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.junit.jupiter.api.BeforeEach;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
@ -124,4 +124,18 @@ public class WebSocketTester
return frame;
}
}
protected void receiveEof(InputStream in) throws IOException
{
ByteBuffer buffer = bufferPool.acquire(4096, false);
while (true)
{
BufferUtil.flipToFill(buffer);
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (len < 0)
return;
throw new IllegalStateException("unexpected content");
}
}
}