Issue #207 - reworking I/O to allow streaming backpressure

This commit is contained in:
Joakim Erdfelt 2017-04-06 15:58:35 -07:00
parent be747c44e1
commit aae4a24726
14 changed files with 291 additions and 383 deletions

View File

@ -121,13 +121,6 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
boolean isOpen();
/**
* Tests if the connection is actively reading.
*
* @return true if connection is actively attempting to read.
*/
boolean isReading();
/**
* Set the maximum number of milliseconds of idleness before the connection is closed/disconnected, (ie no frames are either sent or received)
* <p>

View File

@ -49,7 +49,13 @@ public class Parser
{
public interface Handler
{
void onFrame(Frame frame);
/**
* Notification of completely parsed frame.
*
* @param frame the frame
* @return true to continue parsing, false to stop parsing
*/
boolean onFrame(Frame frame);
}
private enum State
@ -182,64 +188,42 @@ public class Parser
{
return (flagsInUse & 0x10) != 0;
}
protected void notifyFrame(final Frame f)
public boolean parse(ByteBuffer buffer) throws WebSocketException
{
if (policy.getBehavior() == WebSocketBehavior.SERVER)
// TODO quick fail, nothing to parse
if (!buffer.hasRemaining())
{
/* Parsing on server.
*
* Then you MUST make sure all incoming frames are masked!
*
* Technically, this test is in violation of RFC-6455, Section 5.1
* http://tools.ietf.org/html/rfc6455#section-5.1
*
* But we can't trust the client at this point, so Jetty opts to close
* the connection as a Protocol error.
*/
if (!f.isMasked())
{
throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)");
}
}
else if(policy.getBehavior() == WebSocketBehavior.CLIENT)
{
// Required by RFC-6455 / Section 5.1
if (f.isMasked())
{
throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)");
}
}
this.parserHandler.onFrame(f);
}
public void parse(ByteBuffer buffer) throws WebSocketException
{
if (buffer.remaining() <= 0)
{
return;
return true;
}
try
{
// parse through all the frames in the buffer
while (parseFrame(buffer))
{
if (LOG.isDebugEnabled())
LOG.debug("{} Parsed Frame: {}",policy.getBehavior(),frame);
notifyFrame(frame);
LOG.debug("{} Parsed Frame: {}", policy.getBehavior(), frame);
assertBehavior();
if (frame.isDataFrame())
{
priorDataFrame = !frame.isFin();
}
reset();
if(!this.parserHandler.onFrame(frame))
{
return false;
}
}
// completely consumed buffer
return true;
}
catch (Throwable t)
{
buffer.position(buffer.limit()); // consume remaining
reset();
// let session know
WebSocketException wse;
@ -251,16 +235,42 @@ public class Parser
throw wse;
}
}
private void reset()
private void assertBehavior()
{
if (frame != null)
frame.reset();
frame = null;
bufferPool.release(payload);
payload = null;
if (policy.getBehavior() == WebSocketBehavior.SERVER)
{
/* Parsing on server.
*
* Then you MUST make sure all incoming frames are masked!
*
* Technically, this test is in violation of RFC-6455, Section 5.1
* http://tools.ietf.org/html/rfc6455#section-5.1
*
* But we can't trust the client at this point, so Jetty opts to close
* the connection as a Protocol error.
*/
if (!frame.isMasked())
{
throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)");
}
}
else if(policy.getBehavior() == WebSocketBehavior.CLIENT)
{
// Required by RFC-6455 / Section 5.1
if (frame.isMasked())
{
throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)");
}
}
}
public void release(Frame frame)
{
if (frame.hasPayload())
bufferPool.release(frame.getPayload());
}
/**
* Parse the base framing protocol buffer.
*
@ -602,17 +612,21 @@ public class Parser
payload = bufferPool.acquire(payloadLength,false);
BufferUtil.clearToFill(payload);
}
// Copy the payload.
payload.put(window);
// if the payload is complete
if (payload.position() == payloadLength)
{
BufferUtil.flipToFlush(payload, 0);
frame.setPayload(payload);
// notify that frame is complete
return true;
}
}
}
// frame not (yet) complete
return false;
}

View File

@ -28,7 +28,6 @@ import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@ -63,8 +62,6 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable, Parser.Handler
{
private final AtomicBoolean closed = new AtomicBoolean();
private class Flusher extends FrameFlusher
{
private Flusher(ByteBufferPool bufferPool, int bufferSize, Generator generator, EndPoint endpoint)
@ -174,35 +171,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
public static class Stats
{
private AtomicLong countFillInterestedEvents = new AtomicLong(0);
private AtomicLong countOnFillableEvents = new AtomicLong(0);
private AtomicLong countFillableErrors = new AtomicLong(0);
public long getFillableErrorCount()
{
return countFillableErrors.get();
}
public long getFillInterestedCount()
{
return countFillInterestedEvents.get();
}
public long getOnFillableCount()
{
return countOnFillableEvents.get();
}
}
private enum ReadMode
{
PARSE,
DISCARD,
EOF
}
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
@ -211,7 +179,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
*/
private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
private final ByteBufferPool bufferPool;
private final Scheduler scheduler;
private final Generator generator;
@ -219,15 +187,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final WebSocketPolicy policy;
private final WebSocketBehavior behavior;
private final AtomicBoolean suspendToken;
private final AtomicBoolean closed = new AtomicBoolean();
private final FrameFlusher flusher;
private final String id;
private final ExtensionStack extensionStack;
private List<ExtensionConfig> extensions;
private boolean isFilling;
private ByteBuffer networkBuffer;
private ByteBuffer prefillBuffer;
private ReadMode readMode = ReadMode.PARSE;
private IOState ioState;
private Stats stats = new Stats();
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
{
@ -342,13 +309,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
public void fillInterested()
{
stats.countFillInterestedEvents.incrementAndGet();
super.fillInterested();
}
@Override
public ByteBufferPool getBufferPool()
{
@ -417,23 +377,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return scheduler;
}
public Stats getStats()
{
return stats;
}
@Override
public boolean isOpen()
{
return !closed.get();
}
@Override
public boolean isReading()
{
return isFilling;
}
/**
* Physical connection disconnect.
* <p>
@ -505,69 +454,109 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
@Override
public void onFrame(Frame frame)
public boolean onFrame(Frame frame)
{
AtomicBoolean result = new AtomicBoolean(false);
extensionStack.incomingFrame(frame, new FrameCallback()
{
@Override
public void fail(Throwable cause)
{
// TODO: suspend
}
@Override
public void succeed()
{
// TODO: resume
parser.release(frame);
if(!result.compareAndSet(false,true))
{
// callback has been notified asynchronously
fillAndParse();
}
}
@Override
public void fail(Throwable cause)
{
parser.release(frame);
// notify session & endpoint
notifyError(cause);
}
});
if(result.compareAndSet(false, true))
{
// callback hasn't been notified yet
return false;
}
return true;
}
public void shutdown()
{
}
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable()",behavior);
stats.countOnFillableEvents.incrementAndGet();
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
networkBuffer = bufferPool.acquire(getInputBufferSize(),true);
fillAndParse();
}
private void fillAndParse()
{
try
{
isFilling = true;
if(readMode == ReadMode.PARSE)
while (true)
{
readMode = readParse(buffer);
}
else
{
readMode = readDiscard(buffer);
if (suspendToken.get())
{
return;
}
if (networkBuffer.hasRemaining())
{
if (!parser.parse(networkBuffer)) return;
}
// TODO: flip/fill?
int filled = getEndPoint().fill(networkBuffer);
if (filled < 0)
{
bufferPool.release(networkBuffer);
shutdown();
return;
}
if (filled == 0)
{
bufferPool.release(networkBuffer);
fillInterested();
return;
}
if (!parser.parse(networkBuffer)) return;
}
}
finally
catch (IOException e)
{
bufferPool.release(buffer);
LOG.warn(e);
close(StatusCode.PROTOCOL,e.getMessage());
}
if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
catch (CloseException e)
{
fillInterested();
LOG.debug(e);
close(e.getStatusCode(),e.getMessage());
}
else
catch (Throwable t)
{
isFilling = false;
LOG.warn(t);
close(StatusCode.ABNORMAL,t.getMessage());
}
}
@Override
protected void onFillInterestedFailed(Throwable cause)
{
LOG.ignore(cause);
stats.countFillInterestedEvents.incrementAndGet();
super.onFillInterestedFailed(cause);
}
/**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting
@ -583,9 +572,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
prefillBuffer = prefilled;
}
private void notifyError(Throwable t)
private void notifyError(Throwable cause)
{
extensionStack.incomingError(t);
extensionStack.incomingError(cause);
}
@Override
@ -643,45 +632,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
flusher.enqueue(frame,callback,batchMode);
}
private ReadMode readDiscard(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
try
{
while (true)
{
int filled = endPoint.fill(buffer);
if (filled == 0)
{
return ReadMode.DISCARD;
}
else if (filled < 0)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
return ReadMode.EOF;
}
else
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
}
}
catch (IOException e)
{
LOG.ignore(e);
return ReadMode.EOF;
}
catch (Throwable t)
{
LOG.ignore(t);
return ReadMode.DISCARD;
}
}
private ReadMode readParse(ByteBuffer buffer)
/**
* Read from Endpoint and parse bytes.
*
* @param buffer
* @return
*/
@Deprecated
private int readParse(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
try
@ -694,12 +653,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
ioState.onReadFailure(new EOFException("Remote Read EOF"));
return ReadMode.EOF;
return filled;
}
else if (filled == 0)
{
// Done reading, wait for next onFillable
return ReadMode.PARSE;
return filled;
}
if (LOG.isDebugEnabled())
@ -714,33 +673,27 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.warn(e);
close(StatusCode.PROTOCOL,e.getMessage());
return ReadMode.DISCARD;
return -1;
}
catch (CloseException e)
{
LOG.debug(e);
close(e.getStatusCode(),e.getMessage());
return ReadMode.DISCARD;
return -1;
}
catch (Throwable t)
{
LOG.warn(t);
close(StatusCode.ABNORMAL,t.getMessage());
// TODO: should ws only switch to discard if a non-ws-endpoint error?
return ReadMode.DISCARD;
return -1;
}
}
@Override
public void resume()
{
if (suspendToken.getAndSet(false))
{
if (!isReading())
{
fillInterested();
}
}
suspendToken.set(false);
fillAndParse();
}
/**

View File

@ -90,6 +90,7 @@ public class InputStreamMessageSink implements MessageSink
LOG.debug("dispatch complete await() - {}", stream);
try
{
// TODO: remove
dispatchCompleted.await();
}
catch (InterruptedException e)

View File

@ -20,11 +20,10 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.log.Log;
@ -42,180 +41,144 @@ public class MessageInputStream extends InputStream implements MessageSink
{
private static final Logger LOG = Log.getLogger(MessageInputStream.class);
private static final FrameCallbackBuffer EOF = new FrameCallbackBuffer(new FrameCallback.Adapter(), ByteBuffer.allocate(0).asReadOnlyBuffer());
private final BlockingDeque<FrameCallbackBuffer> buffers = new LinkedBlockingDeque<>();
private final Deque<FrameCallbackBuffer> buffers = new ArrayDeque<>(2);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final long timeoutMs;
private final CountDownLatch closedLatch = new CountDownLatch(1);
private FrameCallbackBuffer activeBuffer = null;
public MessageInputStream()
{
this(-1);
}
public MessageInputStream(int timeoutMs)
{
this.timeoutMs = timeoutMs;
}
@Override
public void accept(Frame frame, FrameCallback callback)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Appending {}", frame);
LOG.debug("accepting {}", frame);
}
// If closed, we should just toss incoming payloads into the bit bucket.
if (closed.get())
{
callback.fail(new IOException("Already Closed"));
return;
}
// Put the payload into the queue, by copying it.
// Copying is necessary because the payload will
// be processed after this method returns.
try
if (!frame.hasPayload() && !frame.isFin())
{
callback.succeed();
return;
}
synchronized (buffers)
{
if (!frame.hasPayload())
{
// skip if no payload
callback.succeed();
return;
}
ByteBuffer payload = frame.getPayload();
int capacity = payload.remaining();
if (capacity <= 0)
{
// skip if no payload data to copy
callback.succeed();
return;
}
buffers.offer(new FrameCallbackBuffer(callback, payload));
// TODO: the copy buffer should be pooled too, but no buffer pool available from here.
ByteBuffer copy = payload.isDirect() ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
copy.put(payload).flip();
buffers.put(new FrameCallbackBuffer(callback,copy));
// TODO: backpressure
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
finally
{
if (frame.isFin())
{
buffers.offer(EOF);
}
// notify other thread
buffers.notify();
}
}
@Override
public void close() throws IOException
{
if (closed.compareAndSet(false, true))
{
buffers.offer(EOF);
super.close();
closedLatch.countDown();
synchronized (buffers)
{
buffers.offer(EOF);
buffers.notify();
}
}
super.close();
}
private void shutdown()
{
closed.set(true);
// Removed buffers that may have remained in the queue.
buffers.clear();
}
@Override
public void mark(int readlimit)
{
// Not supported.
}
@Override
public boolean markSupported()
{
return false;
}
@Override
public int read() throws IOException
{
try
byte buf[] = new byte[1];
while (true)
{
if (closed.get())
{
if (LOG.isDebugEnabled())
LOG.debug("Stream closed");
int len = read(buf, 0, 1);
if (len < 0) // EOF
return -1;
}
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.buffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("Waiting {} ms to read", timeoutMs);
if (timeoutMs < 0)
{
// Wait forever until a buffer is available.
// TODO: notify connection to resume (if paused)
activeBuffer = buffers.take();
}
else
{
// Wait at most for the given timeout.
activeBuffer = buffers.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (activeBuffer == null)
{
throw new IOException(String.format("Read timeout: %,dms expired", timeoutMs));
}
}
if (activeBuffer == EOF)
{
if (LOG.isDebugEnabled())
LOG.debug("Reached EOF");
// Be sure that this stream cannot be reused.
closed.set(true);
closedLatch.countDown();
// Removed buffers that may have remained in the queue.
buffers.clear();
return -1;
}
}
return activeBuffer.buffer.get() & 0xFF;
}
catch (InterruptedException x)
{
if (LOG.isDebugEnabled())
LOG.debug("Interrupted while waiting to read", x);
closed.set(true);
closedLatch.countDown();
return -1;
if (len > 0) // did read something
return buf[0];
// reading nothing (len == 0) tries again
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
if (closed.get())
{
if (LOG.isDebugEnabled())
LOG.debug("Stream closed");
return -1;
}
// sync and poll queue
FrameCallbackBuffer result;
synchronized (buffers)
{
try
{
while ((result = buffers.poll()) == null)
{
// TODO: handle read timeout here?
buffers.wait();
}
}
catch (InterruptedException e)
{
shutdown();
throw new InterruptedIOException();
}
}
if (result == EOF)
{
shutdown();
return -1;
}
// We have content
int fillLen = Math.min(result.buffer.remaining(), len);
result.buffer.get(b, off, fillLen);
if (!result.buffer.hasRemaining())
{
result.callback.succeed();
}
// return number of bytes actually copied into buffer
return fillLen;
}
@Override
public void reset() throws IOException
{
throw new IOException("reset() not supported");
}
// TODO: remove await!
@Deprecated
public void awaitClose()
{
try
{
closedLatch.await();
}
catch (InterruptedException e)
{
throw new RuntimeException("Stream Close wait interrupted", e);
}
}
}

View File

@ -44,11 +44,4 @@ public class MessageReader extends InputStreamReader implements MessageSink
{
this.stream.accept(frame, callback);
}
// TODO: remove await!
@Deprecated
public void awaitClose()
{
stream.awaitClose();
}
}

View File

@ -60,7 +60,11 @@ public class ExtensionTool
Assert.assertThat("extClass", extClass, notNullValue());
this.capture = new IncomingFramesCapture();
this.parser = new UnitParser(policy,frame -> ext.incomingFrame(frame, new FrameCallbackAdapter()));
this.parser = new UnitParser(policy, frame ->
{
ext.incomingFrame(frame, new FrameCallbackAdapter());
return true;
});
}
public String getRequestedExtParams()

View File

@ -45,7 +45,6 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
@ -87,7 +86,11 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
// Wire up stack
ext.setNextIncomingFrames(capture);
Parser parser = new UnitParser(policy, frame -> ext.incomingFrame(frame, new FrameCallbackAdapter()));
Parser parser = new UnitParser(policy, (frame) ->
{
ext.incomingFrame(frame, new FrameCallback.Adapter());
return true;
});
parser.configureFromExtensions(Collections.singletonList(ext));
parser.parse(ByteBuffer.wrap(raw));

View File

@ -161,12 +161,6 @@ public class LocalWebSocketConnection implements LogicalConnection, ConnectionSt
return getIOState().isOpen();
}
@Override
public boolean isReading()
{
return false;
}
@Override
public void onConnectionStateChange(ConnectionState state)
{

View File

@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
@ -46,7 +47,7 @@ public class MessageInputStreamTest
@Rule
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
@Test(timeout=10000)
@Test(timeout=5000)
public void testBasicAppendRead() throws IOException
{
try (MessageInputStream stream = new MessageInputStream())
@ -58,9 +59,8 @@ public class MessageInputStreamTest
stream.accept(frame, new FrameCallback.Adapter());
// Read entire message it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,StandardCharsets.UTF_8);
byte data[] = IO.readBytes(stream);
String message = new String(data,0,data.length,StandardCharsets.UTF_8);
// Test it
Assert.assertThat("Message",message,is("Hello World"));
@ -104,9 +104,8 @@ public class MessageInputStreamTest
startLatch.await();
// Read it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,StandardCharsets.UTF_8);
byte data[] = IO.readBytes(stream);
String message = new String(data,0,data.length,StandardCharsets.UTF_8);
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
@ -114,7 +113,7 @@ public class MessageInputStreamTest
}
}
@Test(timeout=10000)
@Test(timeout=5000)
public void testBlockOnReadInitial() throws IOException
{
try (MessageInputStream stream = new MessageInputStream())
@ -150,7 +149,7 @@ public class MessageInputStreamTest
}
}
@Test(timeout=10000)
@Test(timeout=5000)
public void testReadByteNoBuffersClosed() throws IOException
{
try (MessageInputStream stream = new MessageInputStream())
@ -164,14 +163,10 @@ public class MessageInputStreamTest
TimeUnit.MILLISECONDS.sleep(400);
stream.close();
}
catch (InterruptedException e)
catch (Throwable t)
{
hadError.set(true);
e.printStackTrace(System.err);
}
catch (IOException e)
{
e.printStackTrace(System.err);
t.printStackTrace(System.err);
}
}).start();
@ -180,12 +175,12 @@ public class MessageInputStreamTest
// Should be a -1, indicating the end of the stream.
// Test it
Assert.assertThat("Error when appending",hadError.get(),is(false));
Assert.assertThat("Error when closing",hadError.get(),is(false));
Assert.assertThat("Initial byte (Should be EOF)",b,is(-1));
}
}
@Test(timeout=10000)
@Test(timeout=5000)
public void testAppendEmptyPayloadRead() throws IOException
{
try (MessageInputStream stream = new MessageInputStream())
@ -201,16 +196,15 @@ public class MessageInputStreamTest
stream.accept(msg3, new FrameCallback.Adapter());
// Read entire message it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,StandardCharsets.UTF_8);
byte data[] = IO.readBytes(stream);
String message = new String(data,0,data.length,StandardCharsets.UTF_8);
// Test it
Assert.assertThat("Message",message,is("Hello World"));
}
}
@Test(timeout=10000)
@Test(timeout=5000)
public void testAppendNullPayloadRead() throws IOException
{
try (MessageInputStream stream = new MessageInputStream())
@ -227,9 +221,8 @@ public class MessageInputStreamTest
stream.accept(msg3, new FrameCallback.Adapter());
// Read entire message it from the stream.
byte buf[] = new byte[32];
int len = stream.read(buf);
String message = new String(buf,0,len,StandardCharsets.UTF_8);
byte data[] = IO.readBytes(stream);
String message = new String(data,0,data.length,StandardCharsets.UTF_8);
// Test it
Assert.assertThat("Message",message,is("Hello World"));

View File

@ -626,9 +626,10 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
}
@Override
public void onFrame(Frame frame)
public boolean onFrame(Frame frame)
{
// TODO
// TODO: do something with frame?
return true;
}
public EventQueue<WebSocketFrame> readFrames(int expectedFrameCount, int timeoutDuration, TimeUnit timeoutUnit) throws Exception

View File

@ -101,7 +101,11 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
this.policy.setMaxTextMessageSize(100000);
// This is a blockhead server connection, no point tracking leaks on this object.
this.bufferPool = new MappedByteBufferPool(BUFFER_SIZE);
this.parser = new Parser(policy,bufferPool,frame -> extensionStack.incomingFrame(frame, new FrameCallbackAdapter()));
this.parser = new Parser(policy,bufferPool, frame ->
{
extensionStack.incomingFrame(frame, new FrameCallbackAdapter());
return true;
});
this.parseCount = new AtomicInteger(0);
this.generator = new Generator(policy,bufferPool,false);
this.extensionRegistry = new WebSocketExtensionFactory(new SimpleContainerScope(policy,bufferPool));

View File

@ -126,12 +126,6 @@ public class DummyConnection implements LogicalConnection
return false;
}
@Override
public boolean isReading()
{
return false;
}
@Override
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{

View File

@ -25,8 +25,6 @@ import java.util.Queue;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -38,7 +36,6 @@ import org.junit.Assert;
public class IncomingFramesCapture implements Parser.Handler, IncomingFrames
{
private static final Logger LOG = Log.getLogger(IncomingFramesCapture.class);
private EventQueue<WebSocketFrame> frames = new EventQueue<>();
private EventQueue<Throwable> errors = new EventQueue<>();
@ -161,10 +158,11 @@ public class IncomingFramesCapture implements Parser.Handler, IncomingFrames
}
@Override
public void onFrame(Frame frame)
public boolean onFrame(Frame frame)
{
WebSocketFrame copy = WebSocketFrame.copy(frame);
frames.add(copy);
return true;
}
public int size()