Merge pull request #3335 from lachlan-roberts/jetty-10.0.x-3290-websocket-close
Issue #3290 - FrameFlusher and WebSocket close fixes
This commit is contained in:
commit
c14df16494
|
@ -175,7 +175,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
|
|||
close(null);
|
||||
}
|
||||
|
||||
protected final void close(Throwable failure)
|
||||
public final void close(Throwable failure)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("close({}) {}",failure,this);
|
||||
|
|
|
@ -18,14 +18,14 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.core;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Utf8Appendable;
|
||||
import org.eclipse.jetty.util.Utf8StringBuilder;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Utf8Appendable;
|
||||
import org.eclipse.jetty.util.Utf8StringBuilder;
|
||||
|
||||
/**
|
||||
* Representation of a WebSocket Close (status code & reason)
|
||||
*/
|
||||
|
@ -45,6 +45,7 @@ public class CloseStatus
|
|||
public static final int FAILED_TLS_HANDSHAKE = 1015;
|
||||
|
||||
public static final CloseStatus NO_CODE_STATUS = new CloseStatus(NO_CODE);
|
||||
public static final CloseStatus NO_CLOSE_STATUS = new CloseStatus(NO_CLOSE);
|
||||
public static final CloseStatus NORMAL_STATUS = new CloseStatus(NORMAL);
|
||||
|
||||
static final int MAX_REASON_PHRASE = Frame.MAX_CONTROL_PAYLOAD - 2;
|
||||
|
@ -171,6 +172,21 @@ public class CloseStatus
|
|||
return null;
|
||||
}
|
||||
|
||||
// TODO consider defining a precedence for every CloseStatus, and change ChannelState only if higher precedence
|
||||
public static boolean isOrdinary(CloseStatus closeStatus)
|
||||
{
|
||||
switch (closeStatus.getCode())
|
||||
{
|
||||
case NORMAL:
|
||||
case SHUTDOWN:
|
||||
case NO_CODE:
|
||||
return true;
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public int getCode()
|
||||
{
|
||||
return code;
|
||||
|
|
|
@ -18,14 +18,15 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.core.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
@ -50,6 +51,7 @@ public class FrameFlusher extends IteratingCallback
|
|||
private final List<Entry> entries;
|
||||
private final List<ByteBuffer> buffers;
|
||||
private ByteBuffer batchBuffer = null;
|
||||
private Throwable closedCause;
|
||||
|
||||
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
|
||||
{
|
||||
|
@ -62,29 +64,48 @@ public class FrameFlusher extends IteratingCallback
|
|||
this.buffers = new ArrayList<>((maxGather * 2) + 1);
|
||||
}
|
||||
|
||||
public void enqueue(Frame frame, Callback callback, boolean batch)
|
||||
|
||||
/**
|
||||
* Enqueue a Frame to be written to the endpoint.
|
||||
* @param frame The frame to queue
|
||||
* @param callback The callback to call once the frame is sent
|
||||
* @param batch True if batch mode is to be used
|
||||
* @return returns true if the frame was enqueued and iterate needs to be called, returns false if the
|
||||
* FrameFlusher was closed
|
||||
*/
|
||||
public boolean enqueue(Frame frame, Callback callback, boolean batch)
|
||||
{
|
||||
Entry entry = new Entry(frame, callback, batch);
|
||||
byte opCode = frame.getOpCode();
|
||||
Throwable failure = null;
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
if (opCode == OpCode.PING || opCode == OpCode.PONG)
|
||||
if (closedCause != null)
|
||||
failure = closedCause;
|
||||
else if (opCode == OpCode.PING || opCode == OpCode.PONG)
|
||||
queue.offerFirst(entry);
|
||||
else
|
||||
queue.offerLast(entry);
|
||||
}
|
||||
|
||||
if (failure != null)
|
||||
callback.failed(failure);
|
||||
|
||||
return failure==null;
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
public void onClose(Throwable t)
|
||||
{
|
||||
Throwable cause = null;
|
||||
if (t == null)
|
||||
t = new ClosedChannelException();
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
if (!queue.isEmpty())
|
||||
cause = new IOException("Closed");
|
||||
closedCause = t;
|
||||
}
|
||||
if (cause!=null)
|
||||
onCompleteFailure(cause);
|
||||
|
||||
iterate();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -96,6 +117,9 @@ public class FrameFlusher extends IteratingCallback
|
|||
boolean flush = false;
|
||||
synchronized (this)
|
||||
{
|
||||
if (closedCause != null)
|
||||
throw closedCause;
|
||||
|
||||
// Succeed entries from previous call to process
|
||||
// and clear batchBuffer if we wrote it.
|
||||
if (succeedEntries() && batchBuffer != null)
|
||||
|
@ -222,11 +246,17 @@ public class FrameFlusher extends IteratingCallback
|
|||
@Override
|
||||
public void onCompleteFailure(Throwable failure)
|
||||
{
|
||||
BufferUtil.clear(batchBuffer);
|
||||
releaseAggregate();
|
||||
synchronized (this)
|
||||
{
|
||||
entries.addAll(queue);
|
||||
queue.clear();
|
||||
|
||||
if (closedCause == null)
|
||||
closedCause = failure;
|
||||
else if (closedCause != failure)
|
||||
closedCause.addSuppressed(failure);
|
||||
}
|
||||
|
||||
for (Entry entry : entries)
|
||||
|
@ -235,6 +265,10 @@ public class FrameFlusher extends IteratingCallback
|
|||
entry.release();
|
||||
}
|
||||
entries.clear();
|
||||
if (endPoint instanceof AbstractEndPoint)
|
||||
((AbstractEndPoint)endPoint).close(failure);
|
||||
else
|
||||
endPoint.close();
|
||||
}
|
||||
|
||||
private void releaseAggregate()
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.net.SocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.List;
|
||||
|
@ -291,11 +292,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
return this.connection.getBufferPool();
|
||||
}
|
||||
|
||||
public void onClosed(Throwable cause)
|
||||
public void onEof()
|
||||
{
|
||||
CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString());
|
||||
if (channelState.onClosed(closeStatus))
|
||||
closeConnection(cause, closeStatus, NOOP);
|
||||
if (channelState.onEof())
|
||||
closeConnection(new ClosedChannelException(), channelState.getCloseStatus(), Callback.NOOP);
|
||||
}
|
||||
|
||||
public void closeConnection(Throwable cause, CloseStatus closeStatus, Callback callback)
|
||||
|
@ -527,7 +527,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
if (closeStatus instanceof AbnormalCloseStatus)
|
||||
if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus))
|
||||
closeConnection(null, closeStatus, Callback.from(
|
||||
()->callback.failed(ex),
|
||||
x2->
|
||||
|
|
|
@ -122,6 +122,25 @@ public class WebSocketChannelState
|
|||
}
|
||||
}
|
||||
|
||||
public boolean onEof()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
switch (_channelState)
|
||||
{
|
||||
case CLOSED:
|
||||
case ISHUT:
|
||||
return false;
|
||||
|
||||
default:
|
||||
if (_closeStatus == null || CloseStatus.isOrdinary(_closeStatus))
|
||||
_closeStatus = CloseStatus.NO_CLOSE_STATUS;
|
||||
_channelState = State.CLOSED;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean onOutgoingFrame(Frame frame) throws ProtocolException
|
||||
{
|
||||
byte opcode = frame.getOpCode();
|
||||
|
@ -130,11 +149,7 @@ public class WebSocketChannelState
|
|||
synchronized (this)
|
||||
{
|
||||
if (!isOutputOpen())
|
||||
{
|
||||
if (opcode == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof WebSocketChannel.AbnormalCloseStatus)
|
||||
_channelState = State.CLOSED;
|
||||
throw new IllegalStateException(_channelState.toString());
|
||||
}
|
||||
|
||||
if (opcode == OpCode.CLOSE)
|
||||
{
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.core.internal;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -167,12 +168,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onClose() of physical connection");
|
||||
|
||||
Throwable t = new ClosedChannelException();
|
||||
|
||||
if (!channel.isClosed())
|
||||
{
|
||||
IOException e = new IOException("Closed");
|
||||
channel.onClosed(e);
|
||||
}
|
||||
flusher.onClose();
|
||||
channel.onEof();
|
||||
|
||||
flusher.onClose(t);
|
||||
|
||||
super.onClose();
|
||||
}
|
||||
|
@ -347,12 +348,10 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
|
||||
if (!fillingAndParsing)
|
||||
throw new IllegalStateException();
|
||||
if (demand > 0)
|
||||
if (demand != 0) //if demand was canceled, this creates synthetic demand in order to read until EOF
|
||||
return true;
|
||||
|
||||
if (demand == 0)
|
||||
fillingAndParsing = false;
|
||||
|
||||
fillingAndParsing = false;
|
||||
if (networkBuffer.isEmpty())
|
||||
releaseNetworkBuffer();
|
||||
|
||||
|
@ -372,10 +371,9 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
if (!fillingAndParsing)
|
||||
throw new IllegalStateException();
|
||||
|
||||
if (demand < 0)
|
||||
return false;
|
||||
if (demand > 0)
|
||||
demand--;
|
||||
|
||||
demand--;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -410,9 +408,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
onFrame(frame);
|
||||
|
||||
if (!moreDemand())
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// buffer must be empty here because parser is fully consuming
|
||||
|
@ -436,7 +432,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
if (filled < 0)
|
||||
{
|
||||
releaseNetworkBuffer();
|
||||
channel.onClosed(new IOException("Read EOF"));
|
||||
channel.onEof();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -532,43 +528,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
generator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
|
||||
EndPoint endp = getEndPoint();
|
||||
if (endp != null)
|
||||
{
|
||||
result = prime * result + endp.getLocalAddress().hashCode();
|
||||
result = prime * result + endp.getRemoteAddress().hashCode();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
WebSocketConnection other = (WebSocketConnection)obj;
|
||||
EndPoint endp = getEndPoint();
|
||||
EndPoint otherEndp = other.getEndPoint();
|
||||
if (endp == null)
|
||||
{
|
||||
if (otherEndp != null)
|
||||
return false;
|
||||
}
|
||||
else if (!endp.equals(otherEndp))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extra bytes from the initial HTTP upgrade that need to
|
||||
* be processed by the websocket parser before starting
|
||||
|
@ -595,13 +554,13 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
{
|
||||
if (channel.getBehavior() == Behavior.CLIENT)
|
||||
{
|
||||
Frame wsf = frame;
|
||||
byte[] mask = new byte[4];
|
||||
random.nextBytes(mask);
|
||||
wsf.setMask(mask);
|
||||
frame.setMask(mask);
|
||||
}
|
||||
flusher.enqueue(frame, callback, batch);
|
||||
flusher.iterate();
|
||||
|
||||
if (flusher.enqueue(frame, callback, batch))
|
||||
flusher.iterate();
|
||||
}
|
||||
|
||||
private class Flusher extends FrameFlusher
|
||||
|
@ -615,7 +574,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
|||
public void onCompleteFailure(Throwable x)
|
||||
{
|
||||
super.onCompleteFailure(x);
|
||||
channel.processConnectionError(x,NOOP);
|
||||
channel.processConnectionError(x, NOOP);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,7 +118,6 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
server.handler.getCoreSession().demand(1);
|
||||
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
|
||||
Frame frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS);
|
||||
assertNotNull(frame);
|
||||
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
|
||||
|
||||
assertThat(server.handler.getCoreSession().toString(), containsString("ISHUT"));
|
||||
|
@ -143,9 +142,8 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
}
|
||||
|
||||
server.sendFrame(CloseStatus.toFrame(CloseStatus.NORMAL));
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertNotNull(frame);
|
||||
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
|
||||
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
|
||||
|
||||
assertThat(server.handler.getCoreSession().toString(), containsString("OSHUT"));
|
||||
LOG.info("Server: OSHUT");
|
||||
|
@ -162,7 +160,6 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
|
||||
server.handler.receivedCallback.poll().succeeded();
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertNotNull(frame);
|
||||
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
|
||||
|
||||
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
|
||||
|
@ -177,7 +174,6 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
server.sendFrame(CloseStatus.toFrame(CloseStatus.SHUTDOWN));
|
||||
server.handler.receivedCallback.poll().succeeded();
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertNotNull(frame);
|
||||
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SHUTDOWN));
|
||||
|
||||
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
|
||||
|
@ -190,14 +186,27 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
setup(State.ISHUT);
|
||||
server.handler.receivedCallback.poll().failed(new Exception("test failure"));
|
||||
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertNotNull(frame);
|
||||
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(closeStatus.getReason(), is("test failure"));
|
||||
|
||||
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientClosesOutput_ISHUT() throws Exception
|
||||
{
|
||||
setup(State.ISHUT);
|
||||
|
||||
client.shutdownOutput();
|
||||
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
|
||||
server.handler.receivedCallback.poll().succeeded();
|
||||
|
||||
CloseStatus closeStatus = new CloseStatus(receiveFrame(client.getInputStream()));
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientClose_OSHUT() throws Exception
|
||||
{
|
||||
|
@ -276,11 +285,57 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
setup(State.ISHUT);
|
||||
|
||||
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
|
||||
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
|
||||
|
||||
server.close();
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.PROTOCOL));
|
||||
receiveEof(client.getInputStream());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientHalfClose_ISHUT() throws Exception
|
||||
{
|
||||
setup(State.ISHUT);
|
||||
|
||||
client.shutdownOutput();
|
||||
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
|
||||
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
|
||||
|
||||
callback.succeeded();
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
|
||||
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
|
||||
receiveEof(client.getInputStream());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientCloseServerWrite_ISHUT() throws Exception
|
||||
{
|
||||
setup(State.ISHUT);
|
||||
|
||||
client.close();
|
||||
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
|
||||
|
||||
while(true)
|
||||
{
|
||||
if (!server.isOpen())
|
||||
break;
|
||||
|
||||
Callback callback = Callback.from(()->System.err.println("Succeeded Frame After Close"),
|
||||
(t)->System.err.println("Failed Frame After Close"));
|
||||
server.sendFrame(new Frame(OpCode.TEXT, BufferUtil.toBuffer("frame after close")), callback);
|
||||
}
|
||||
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertNotNull(server.handler.error);
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
|
||||
Callback callback = server.handler.receivedCallback.poll(5, TimeUnit.SECONDS);
|
||||
callback.succeeded();
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -293,7 +348,6 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
server.handler.getCoreSession().demand(1);
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
|
||||
assertThat(server.handler.closeStatus.getReason(), containsString("IOException"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -306,7 +360,6 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
server.handler.getCoreSession().demand(1);
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
|
||||
assertThat(server.handler.closeStatus.getReason(), containsString("IOException"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -362,6 +415,7 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
|
||||
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
|
||||
protected BlockingQueue<Callback> receivedCallback = new BlockingArrayQueue<>();
|
||||
protected volatile Throwable error = null;
|
||||
protected CountDownLatch opened = new CountDownLatch(1);
|
||||
protected CountDownLatch closed = new CountDownLatch(1);
|
||||
protected CloseStatus closeStatus = null;
|
||||
|
@ -410,6 +464,7 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
public void onError(Throwable cause)
|
||||
{
|
||||
LOG.info("onError {} ", cause == null?null:cause.toString());
|
||||
error = cause;
|
||||
state = session.toString();
|
||||
}
|
||||
|
||||
|
@ -477,6 +532,11 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
handler.getCoreSession().sendFrame(frame, NOOP, false);
|
||||
}
|
||||
|
||||
public void sendFrame(Frame frame, Callback callback)
|
||||
{
|
||||
handler.getCoreSession().sendFrame(frame, callback, false);
|
||||
}
|
||||
|
||||
public void sendText(String line)
|
||||
{
|
||||
LOG.info("sending {}...", line);
|
||||
|
|
|
@ -18,6 +18,13 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.core;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.io.ArrayByteBufferPool;
|
||||
|
@ -27,13 +34,6 @@ import org.eclipse.jetty.util.BufferUtil;
|
|||
import org.eclipse.jetty.websocket.core.internal.Parser;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
@ -124,4 +124,18 @@ public class WebSocketTester
|
|||
return frame;
|
||||
}
|
||||
}
|
||||
|
||||
protected void receiveEof(InputStream in) throws IOException
|
||||
{
|
||||
ByteBuffer buffer = bufferPool.acquire(4096, false);
|
||||
while (true)
|
||||
{
|
||||
BufferUtil.flipToFill(buffer);
|
||||
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
|
||||
if (len < 0)
|
||||
return;
|
||||
|
||||
throw new IllegalStateException("unexpected content");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue