Issue #3290 - WebSocket read and writer error handling

when the WSConnection reads EOF it now notifies the WSChannel
the channel instead of handling it locally

fixed FlusherFlusher failure issues

fixed issue with the WebSocketCloseTest expecting close reason

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-02-07 09:58:08 +11:00
parent 7fca634292
commit ff1f3ca3be
7 changed files with 75 additions and 32 deletions

View File

@ -175,7 +175,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
close(null); close(null);
} }
protected final void close(Throwable failure) public final void close(Throwable failure)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("close({}) {}",failure,this); LOG.debug("close({}) {}",failure,this);

View File

@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core; package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.Utf8StringBuilder;
/** /**
* Representation of a WebSocket Close (status code &amp; reason) * Representation of a WebSocket Close (status code &amp; reason)
*/ */
@ -45,6 +45,7 @@ public class CloseStatus
public static final int FAILED_TLS_HANDSHAKE = 1015; public static final int FAILED_TLS_HANDSHAKE = 1015;
public static final CloseStatus NO_CODE_STATUS = new CloseStatus(NO_CODE); public static final CloseStatus NO_CODE_STATUS = new CloseStatus(NO_CODE);
public static final CloseStatus NO_CLOSE_STATUS = new CloseStatus(NO_CLOSE);
public static final CloseStatus NORMAL_STATUS = new CloseStatus(NORMAL); public static final CloseStatus NORMAL_STATUS = new CloseStatus(NORMAL);
static final int MAX_REASON_PHRASE = Frame.MAX_CONTROL_PAYLOAD - 2; static final int MAX_REASON_PHRASE = Frame.MAX_CONTROL_PAYLOAD - 2;

View File

@ -18,14 +18,15 @@
package org.eclipse.jetty.websocket.core.internal; package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -50,6 +51,7 @@ public class FrameFlusher extends IteratingCallback
private final List<Entry> entries; private final List<Entry> entries;
private final List<ByteBuffer> buffers; private final List<ByteBuffer> buffers;
private ByteBuffer batchBuffer = null; private ByteBuffer batchBuffer = null;
private Throwable closedCause;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
{ {
@ -62,29 +64,39 @@ public class FrameFlusher extends IteratingCallback
this.buffers = new ArrayList<>((maxGather * 2) + 1); this.buffers = new ArrayList<>((maxGather * 2) + 1);
} }
public void enqueue(Frame frame, Callback callback, boolean batch) public boolean enqueue(Frame frame, Callback callback, boolean batch)
{ {
Entry entry = new Entry(frame, callback, batch); Entry entry = new Entry(frame, callback, batch);
byte opCode = frame.getOpCode(); byte opCode = frame.getOpCode();
Throwable failure = null;
synchronized (this) synchronized (this)
{ {
if (opCode == OpCode.PING || opCode == OpCode.PONG) if (closedCause != null)
failure = closedCause;
else if (opCode == OpCode.PING || opCode == OpCode.PONG)
queue.offerFirst(entry); queue.offerFirst(entry);
else else
queue.offerLast(entry); queue.offerLast(entry);
} }
if (failure != null)
callback.failed(failure);
return failure==null;
} }
public void onClose() public void onClose(Throwable t)
{ {
Throwable cause = null; if (t == null)
t = new ClosedChannelException();
synchronized (this) synchronized (this)
{ {
if (!queue.isEmpty()) closedCause = t;
cause = new IOException("Closed");
} }
if (cause!=null)
onCompleteFailure(cause); iterate();
} }
@Override @Override
@ -96,6 +108,9 @@ public class FrameFlusher extends IteratingCallback
boolean flush = false; boolean flush = false;
synchronized (this) synchronized (this)
{ {
if (closedCause != null)
throw closedCause;
// Succeed entries from previous call to process // Succeed entries from previous call to process
// and clear batchBuffer if we wrote it. // and clear batchBuffer if we wrote it.
if (succeedEntries() && batchBuffer != null) if (succeedEntries() && batchBuffer != null)
@ -222,11 +237,17 @@ public class FrameFlusher extends IteratingCallback
@Override @Override
public void onCompleteFailure(Throwable failure) public void onCompleteFailure(Throwable failure)
{ {
BufferUtil.clear(batchBuffer);
releaseAggregate(); releaseAggregate();
synchronized (this) synchronized (this)
{ {
entries.addAll(queue); entries.addAll(queue);
queue.clear(); queue.clear();
if (closedCause == null)
closedCause = failure;
else if (closedCause != failure)
closedCause.addSuppressed(failure);
} }
for (Entry entry : entries) for (Entry entry : entries)
@ -235,6 +256,10 @@ public class FrameFlusher extends IteratingCallback
entry.release(); entry.release();
} }
entries.clear(); entries.clear();
if (endPoint instanceof AbstractEndPoint)
((AbstractEndPoint)endPoint).close(failure);
else
endPoint.close();
} }
private void releaseAggregate() private void releaseAggregate()

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.List; import java.util.List;
@ -291,11 +292,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
return this.connection.getBufferPool(); return this.connection.getBufferPool();
} }
public void onClosed(Throwable cause) public void onEof()
{ {
CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString()); if (channelState.onEof())
if (channelState.onClosed(closeStatus)) closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP);
closeConnection(cause, closeStatus, NOOP);
} }
public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback) public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback)

View File

@ -122,6 +122,25 @@ public class WebSocketChannelState
} }
} }
public boolean onEof()
{
synchronized (this)
{
switch (_channelState)
{
case CLOSED:
case ISHUT:
return false;
default:
if (_closeStatus == null || CloseStatus.isOrdinary(_closeStatus))
_closeStatus = CloseStatus.NO_CLOSE_STATUS;
_channelState = State.CLOSED;
return true;
}
}
}
public boolean onOutgoingFrame(Frame frame) throws ProtocolException public boolean onOutgoingFrame(Frame frame) throws ProtocolException
{ {
byte opcode = frame.getOpCode(); byte opcode = frame.getOpCode();

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Objects; import java.util.Objects;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -167,12 +168,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onClose() of physical connection"); LOG.debug("onClose() of physical connection");
Throwable t = new ClosedChannelException();
if (!channel.isClosed()) if (!channel.isClosed())
{ channel.onEof();
IOException e = new IOException("Closed");
channel.onClosed(e); flusher.onClose(t);
}
flusher.onClose();
super.onClose(); super.onClose();
} }
@ -410,7 +411,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
onFrame(frame); onFrame(frame);
if (!moreDemand()) if (!moreDemand())
{
return; return;
} }
} }
@ -436,7 +436,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
if (filled < 0) if (filled < 0)
{ {
releaseNetworkBuffer(); releaseNetworkBuffer();
channel.onClosed(new IOException("Read EOF")); channel.onEof();
return; return;
} }
@ -595,12 +595,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
{ {
if (channel.getBehavior() == Behavior.CLIENT) if (channel.getBehavior() == Behavior.CLIENT)
{ {
Frame wsf = frame;
byte[] mask = new byte[4]; byte[] mask = new byte[4];
random.nextBytes(mask); random.nextBytes(mask);
wsf.setMask(mask); frame.setMask(mask);
} }
flusher.enqueue(frame, callback, batch);
if (flusher.enqueue(frame, callback, batch))
flusher.iterate(); flusher.iterate();
} }
@ -615,7 +615,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
public void onCompleteFailure(Throwable x) public void onCompleteFailure(Throwable x)
{ {
super.onCompleteFailure(x); super.onCompleteFailure(x);
channel.processConnectionError(x,NOOP); channel.processConnectionError(x, NOOP);
} }
} }
} }

View File

@ -293,7 +293,6 @@ public class WebSocketCloseTest extends WebSocketTester
server.handler.getCoreSession().demand(1); server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
assertThat(server.handler.closeStatus.getReason(), containsString("IOException"));
} }
@Test @Test
@ -306,7 +305,6 @@ public class WebSocketCloseTest extends WebSocketTester
server.handler.getCoreSession().demand(1); server.handler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS)); assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
assertThat(server.handler.closeStatus.getReason(), containsString("IOException"));
} }
@Test @Test