Fix broken test in OutgoingMessageCapture.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-08-18 13:15:16 +10:00
parent 5236e47c42
commit 65ff0bb081
2 changed files with 15 additions and 9 deletions

View File

@ -91,9 +91,9 @@ public class ByteBufferMessageSink extends AbstractMessageSink
{ {
bufferPool.release(buffer); bufferPool.release(buffer);
} }
session.demand(1);
} }
session.demand(1);
} }
catch (Throwable t) catch (Throwable t)
{ {

View File

@ -20,6 +20,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.toolchain.test.Hex; import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CloseStatus;
@ -40,6 +42,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
public BlockingQueue<ByteBuffer> binaryMessages = new LinkedBlockingDeque<>(); public BlockingQueue<ByteBuffer> binaryMessages = new LinkedBlockingDeque<>();
public BlockingQueue<String> events = new LinkedBlockingDeque<>(); public BlockingQueue<String> events = new LinkedBlockingDeque<>();
private final ByteBufferPool bufferPool = new NullByteBufferPool();
private final MethodHandle wholeTextHandle; private final MethodHandle wholeTextHandle;
private final MethodHandle wholeBinaryHandle; private final MethodHandle wholeBinaryHandle;
private MessageSink messageSink; private MessageSink messageSink;
@ -116,16 +119,19 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
if (OpCode.isDataFrame(frame.getOpCode())) if (OpCode.isDataFrame(frame.getOpCode()))
{ {
messageSink.accept(Frame.copy(frame), callback); Frame copy = Frame.copy(frame);
messageSink.accept(copy, Callback.NOOP);
if (frame.isFin()) if (frame.isFin())
{
messageSink = null; messageSink = null;
}
}
else
{
callback.succeeded();
} }
callback.succeeded();
}
@Override
public ByteBufferPool getByteBufferPool()
{
return bufferPool;
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")