Issue #4341 - do not modify payload ByteBuffer sending WebSocket frames (#4359)

- test that Frame buffer is unchanged after sending
- Generator and FrameFlusher no longer modify frame payload
- permessage-deflate no longer modify frame payload
- remove Buffer allocation from Generator
- outgoing autoFragment no longer modifies buffer
- minor cleanups and optimizations from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan 2019-11-29 08:41:20 +11:00 committed by GitHub
parent 3b817821e7
commit acbc87a5da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 322 additions and 275 deletions

View File

@ -117,6 +117,7 @@ public class MessageOutputStream extends OutputStream
frame.setPayload(buffer);
frame.setFin(fin);
int initialBufferSize = buffer.remaining();
try (SharedBlockingCallback.Blocker b = blocker.acquire())
{
coreSession.sendFrame(frame, b, false);
@ -127,8 +128,8 @@ public class MessageOutputStream extends OutputStream
// Any flush after the first will be a CONTINUATION frame.
frame = new Frame(OpCode.CONTINUATION);
// Buffer has been sent, buffer should have been consumed
assert buffer.remaining() == 0;
// Buffer has been sent, but buffer should not have been consumed.
assert buffer.remaining() == initialBufferSize;
BufferUtil.clearToFill(buffer);
}

View File

@ -91,12 +91,11 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
public ByteBuffer asNetworkBuffer(List<Frame> frames)
{
int bufferLength = frames.stream().mapToInt((f) -> f.getPayloadLength() + Generator.MAX_HEADER_LENGTH).sum();
ByteBuffer buffer = ByteBuffer.allocate(bufferLength);
ByteBuffer buffer = BufferUtil.allocate(bufferLength);
for (Frame f : frames)
{
generator.generate(buffer, f);
}
BufferUtil.flipToFlush(buffer, 0);
return buffer;
}

View File

@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Frame;
@ -40,7 +39,6 @@ public class UnitGenerator extends Generator
public UnitGenerator(Behavior behavior)
{
super(new MappedByteBufferPool());
applyMask = (behavior == Behavior.CLIENT);
}

View File

@ -24,10 +24,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -112,12 +110,10 @@ public class WebSocketStatsTest
long getFrameByteSize(Frame frame)
{
ByteBufferPool bufferPool = new MappedByteBufferPool();
Generator generator = new Generator(bufferPool);
ByteBuffer buffer = bufferPool.acquire(frame.getPayloadLength() + 10, true);
int pos = BufferUtil.flipToFill(buffer);
generator.generateWholeFrame(frame, buffer);
return buffer.position() - pos;
Generator generator = new Generator();
ByteBuffer headerBuffer = BufferUtil.allocate(Generator.MAX_HEADER_LENGTH);
generator.generateHeader(frame, headerBuffer);
return headerBuffer.remaining() + frame.getPayloadLength();
}
@Test

View File

@ -37,6 +37,7 @@ public abstract class FragmentingFlusher extends TransformingFlusher
private static final Logger LOG = Log.getLogger(FragmentingFlusher.class);
private final Configuration configuration;
private FrameEntry current;
private ByteBuffer payload;
public FragmentingFlusher(Configuration configuration)
{
@ -56,9 +57,14 @@ public abstract class FragmentingFlusher extends TransformingFlusher
}
current = new FrameEntry(frame, callback, batch);
payload = frame.getPayload().slice();
boolean finished = fragment(callback, true);
if (finished)
{
current = null;
payload = null;
}
return finished;
}
@ -67,14 +73,16 @@ public abstract class FragmentingFlusher extends TransformingFlusher
{
boolean finished = fragment(callback, false);
if (finished)
{
current = null;
payload = null;
}
return finished;
}
private boolean fragment(Callback callback, boolean first)
{
Frame frame = current.frame;
ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
long maxFrameSize = configuration.getMaxFrameSize();
int fragmentSize = (int)Math.min(remaining, maxFrameSize);
@ -87,7 +95,7 @@ public abstract class FragmentingFlusher extends TransformingFlusher
// If we don't need to fragment just forward with original payload.
if (finished)
{
fragment.setPayload(frame.getPayload());
fragment.setPayload(payload);
forwardFrame(fragment, callback, current.batch);
return true;
}

View File

@ -27,7 +27,6 @@ import java.nio.file.Path;
import java.util.Calendar;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil;
@ -112,14 +111,12 @@ public class FrameCaptureExtension extends AbstractExtension
}
ByteBuffer buf = getBufferPool().acquire(BUFSIZE, false);
BufferUtil.flipToFill(buf);
try
{
Frame f = Frame.copy(frame);
f.setMask(null); // TODO is this needed?
generator.generateHeaderBytes(f, buf);
BufferUtil.flipToFlush(buf, 0);
generator.generateHeader(f, buf);
channel.write(buf);
if (frame.hasPayload())
{
@ -178,7 +175,7 @@ public class FrameCaptureExtension extends AbstractExtension
incomingChannel = Files.newByteChannel(incomingFramesPath, CREATE, WRITE);
outgoingChannel = Files.newByteChannel(outgoingFramesPath, CREATE, WRITE);
this.generator = new Generator(getBufferPool(), true);
this.generator = new Generator();
}
catch (IOException e)
{

View File

@ -64,6 +64,7 @@ public class FrameFlusher extends IteratingCallback
private final List<Entry> previousEntries;
private final List<Entry> failedEntries;
private List<ByteBuffer> releasableBuffers = new ArrayList<>();
private ByteBuffer batchBuffer;
private boolean canEnqueue = true;
private boolean flushed = true;
@ -200,6 +201,7 @@ public class FrameFlusher extends IteratingCallback
LOG.debug("Flushing {}", this);
boolean flush = false;
Callback releasingCallback = this;
synchronized (this)
{
if (closedCause != null)
@ -233,45 +235,64 @@ public class FrameFlusher extends IteratingCallback
if (batch)
{
// Acquire a batchBuffer if we don't have one
// Acquire a batchBuffer if we don't have one.
if (batchBuffer == null)
{
batchBuffer = acquireBuffer(bufferSize);
buffers.add(batchBuffer);
}
// generate the frame into the batchBuffer
entry.generateHeaderBytes(batchBuffer);
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
BufferUtil.append(batchBuffer, payload);
}
else if (batchBuffer != null && batchSpace >= Generator.MAX_HEADER_LENGTH)
{
// Use the batch space for our header
entry.generateHeaderBytes(batchBuffer);
flush = true;
// Add the payload to the list of buffers
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
buffers.add(payload);
// Generate the frame into the batchBuffer.
generator.generateWholeFrame(entry.frame, batchBuffer);
}
else
{
// Add headers and payload to the list of buffers
// TODO: release this buffer.
ByteBuffer buffer = acquireBuffer(Generator.MAX_HEADER_LENGTH);
buffers.add(buffer);
entry.generateHeaderBytes(buffer);
flush = true;
if (batchBuffer != null && batchSpace >= Generator.MAX_HEADER_LENGTH)
{
// Use the batch space for our header.
generator.generateHeader(entry.frame, batchBuffer);
}
else
{
// Add headers to the list of buffers.
ByteBuffer headerBuffer = acquireBuffer(Generator.MAX_HEADER_LENGTH);
releasableBuffers.add(headerBuffer);
generator.generateHeader(entry.frame, headerBuffer);
buffers.add(headerBuffer);
}
// Add the payload to the list of buffers.
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
buffers.add(payload);
{
if (entry.frame.isMasked())
{
payload = acquireBuffer(entry.frame.getPayloadLength());
releasableBuffers.add(payload);
generator.generatePayload(entry.frame, payload);
}
buffers.add(payload.slice());
}
flush = true;
}
flushed = flush;
}
// If we are going to flush we should release any buffers we have allocated after the callback completes.
if (flush)
{
final List<ByteBuffer> callbackBuffers = releasableBuffers;
releasableBuffers = new ArrayList<>();
releasingCallback = Callback.from(releasingCallback, () ->
{
for (ByteBuffer buffer : callbackBuffers)
{
bufferPool.release(buffer);
}
});
}
}
if (LOG.isDebugEnabled())
@ -288,7 +309,6 @@ public class FrameFlusher extends IteratingCallback
if (entry.frame.getOpCode() == OpCode.CLOSE)
endPoint.shutdownOutput();
notifyCallbackSuccess(entry.callback);
entry.release();
}
previousEntries.clear();
@ -310,7 +330,7 @@ public class FrameFlusher extends IteratingCallback
bufferArray[i++] = bb;
}
bytesOut.add(bytes);
endPoint.write(this, bufferArray);
endPoint.write(releasingCallback, bufferArray);
buffers.clear();
}
else
@ -397,6 +417,12 @@ public class FrameFlusher extends IteratingCallback
failedEntries.addAll(entries);
entries.clear();
for (ByteBuffer buffer : releasableBuffers)
{
bufferPool.release(buffer);
}
releasableBuffers.clear();
if (closedCause == null)
closedCause = failure;
else if (closedCause != failure)
@ -406,7 +432,6 @@ public class FrameFlusher extends IteratingCallback
for (Entry entry : failedEntries)
{
notifyCallbackFailure(entry.callback, failure);
entry.release();
}
failedEntries.clear();
@ -493,22 +518,6 @@ public class FrameFlusher extends IteratingCallback
super(frame, callback, batch);
}
private void generateHeaderBytes(ByteBuffer buffer)
{
int pos = BufferUtil.flipToFill(buffer);
generator.generateHeaderBytes(frame, buffer);
BufferUtil.flipToFlush(buffer, pos);
}
private void release()
{
if (headerBuffer != null)
{
generator.getBufferPool().release(headerBuffer);
headerBuffer = null;
}
}
private long getTimeOfCreation()
{
return timeOfCreation;

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.Frame;
@ -54,57 +53,27 @@ public class Generator
* The overhead (maximum) for a framing header. Assuming a maximum sized payload with masking key.
*/
public static final int MAX_HEADER_LENGTH = 28;
private static byte[] mask = {0x00, (byte)0xF0, 0x0F, (byte)0xFF};
public static void putMask(ByteBuffer buffer)
{
buffer.put(mask, 0, mask.length);
}
public static void putPayload(ByteBuffer buffer, byte[] payload)
{
int len = payload.length;
for (int i = 0; i < len; i++)
{
buffer.put((byte)(payload[i] ^ mask[i % 4]));
}
}
private final ByteBufferPool bufferPool;
private final boolean readOnly;
/**
* Construct Generator with provided policy and bufferPool
*
* @param bufferPool the buffer pool to use
* Generate the whole frame (header + payload copy) into a single ByteBuffer.
* @param frame the frame to generate.
* @param buffer the buffer to output the generated frame to.
*/
public Generator(ByteBufferPool bufferPool)
public void generateWholeFrame(Frame frame, ByteBuffer buffer)
{
this(bufferPool, false);
generateHeader(frame, buffer);
generatePayload(frame, buffer);
}
/**
* Construct Generator with provided policy and bufferPool
*
* @param bufferPool the buffer pool to use
* Generate the header bytes of a frame into a single ByteBuffer.
* @param frame the frame to generate.
* @param buffer the buffer to output the generated frame to.
*/
public Generator(ByteBufferPool bufferPool, boolean readOnly)
public void generateHeader(Frame frame, ByteBuffer buffer)
{
this.bufferPool = bufferPool;
this.readOnly = readOnly;
}
int pos = BufferUtil.flipToFill(buffer);
public ByteBuffer generateHeaderBytes(Frame frame)
{
ByteBuffer buffer = bufferPool.acquire(MAX_HEADER_LENGTH, false);
BufferUtil.clearToFill(buffer);
generateHeaderBytes(frame, buffer);
BufferUtil.flipToFlush(buffer, 0);
return buffer;
}
public void generateHeaderBytes(Frame frame, ByteBuffer buffer)
{
/*
* start the generation process
*/
@ -177,10 +146,35 @@ public class Generator
}
// masking key
if (frame.isMasked() && !readOnly)
if (frame.isMasked())
buffer.put(frame.getMask());
BufferUtil.flipToFlush(buffer, pos);
}
/**
* Generate the payload of a frame into a single ByteBuffer, if the frame has a mask the payload
* will be masked as it is copied to the output buffer.
* @param frame the frame to generate.
* @param buffer the buffer to output the generated frame to.
*/
public void generatePayload(Frame frame, ByteBuffer buffer)
{
ByteBuffer payload = frame.getPayload();
if (!BufferUtil.hasContent(payload))
return;
int pos = BufferUtil.flipToFill(buffer);
if (frame.isMasked())
maskPayload(buffer, frame);
else
buffer.put(payload.slice());
BufferUtil.flipToFlush(buffer, pos);
}
private void maskPayload(ByteBuffer buffer, Frame frame)
{
byte[] mask = frame.getMask();
buffer.put(mask);
int maskInt = 0;
for (byte maskByte : mask)
{
@ -199,46 +193,16 @@ public class Generator
{
if (remaining >= 4)
{
payload.putInt(start, payload.getInt(start) ^ maskInt);
buffer.putInt(payload.getInt(start) ^ maskInt);
start += 4;
}
else
{
payload.put(start, (byte)(payload.get(start) ^ mask[maskOffset & 3]));
buffer.put((byte)(payload.get(start) ^ mask[maskOffset & 3]));
++start;
++maskOffset;
}
}
}
}
}
/**
* Generate the whole frame (header + payload copy) into a single ByteBuffer.
* <p>
* Note: This is slow, moves lots of memory around. Only use this if you must (such as in unit testing).
*
* @param frame the frame to generate
* @param buf the buffer to output the generated frame to
*/
public void generateWholeFrame(Frame frame, ByteBuffer buf)
{
generateHeaderBytes(frame, buf);
if (frame.hasPayload())
{
if (readOnly)
{
buf.put(frame.getPayload().slice());
}
else
{
buf.put(frame.getPayload());
}
}
}
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
}

View File

@ -253,7 +253,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
_batch = batch;
// Provide the frames payload as input to the Deflater.
getDeflater().setInput(frame.getPayload());
getDeflater().setInput(frame.getPayload().slice());
callback.succeeded();
return false;
}
@ -379,7 +379,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
incomingCompressed = false;
// Provide the frames payload as input to the Inflater.
getInflater().setInput(_frame.getPayload());
getInflater().setInput(_frame.getPayload().slice());
callback.succeeded();
return false;
}

View File

@ -95,7 +95,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
this.coreSession = coreSession;
this.generator = new Generator(bufferPool);
this.generator = new Generator();
this.parser = new Parser(bufferPool, coreSession);
this.flusher = new Flusher(scheduler, coreSession.getOutputBufferSize(), generator, endp);
this.setInputBufferSize(coreSession.getInputBufferSize());

View File

@ -89,9 +89,11 @@ public class AutoFragmentTest
// Send a message which is too large.
int size = maxFrameSize * 2;
byte[] message = new byte[size];
Arrays.fill(message, 0, size, (byte)'X');
clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.toBuffer(message)), Callback.NOOP, false);
byte[] array = new byte[size];
Arrays.fill(array, 0, size, (byte)'X');
ByteBuffer message = BufferUtil.toBuffer(array);
Frame sentFrame = new Frame(OpCode.BINARY, BufferUtil.copy(message));
clientHandler.coreSession.sendFrame(sentFrame, Callback.NOOP, false);
// We should not receive any frames larger than the max frame size.
// So our message should be split into two frames.
@ -108,6 +110,9 @@ public class AutoFragmentTest
assertThat(frame.getPayloadLength(), is(maxFrameSize));
assertThat(frame.isFin(), is(true));
// Original frame payload should not have been changed.
assertThat(sentFrame.getPayload(), is(message));
clientHandler.sendClose();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
@ -290,7 +295,8 @@ public class AutoFragmentTest
serverHandler.coreSession.setAutoFragment(true);
// Send the payload which should be fragmented by the server permessage-deflate.
serverHandler.sendFrame(new Frame(OpCode.BINARY, BufferUtil.copy(payload)), Callback.NOOP, false);
ByteBuffer sendPayload = BufferUtil.copy(payload);
serverHandler.sendFrame(new Frame(OpCode.BINARY, sendPayload), Callback.NOOP, false);
// Assemble the message from the fragmented frames.
ByteBuffer message = BufferUtil.allocate(payload.remaining()*2);
@ -309,6 +315,7 @@ public class AutoFragmentTest
// We received correct payload in 2 frames.
assertThat(message, is(payload));
assertThat(message, is(sendPayload));
assertThat(numFrames, is(2));
clientHandler.sendClose();

View File

@ -0,0 +1,116 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FrameBufferTest extends WebSocketTester
{
private WebSocketServer server;
private TestFrameHandler serverHandler = new TestFrameHandler();
private WebSocketCoreClient client;
private WebSocketComponents components = new WebSocketComponents();
@BeforeEach
public void startup() throws Exception
{
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverHandler);
server = new WebSocketServer(negotiator);
client = new WebSocketCoreClient(null, components);
server.start();
client.start();
}
@AfterEach
public void shutdown() throws Exception
{
server.start();
client.start();
}
@Test
public void testSingleFrame() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<CoreSession> connect = client.connect(clientHandler, server.getUri());
connect.get(5, TimeUnit.SECONDS);
ByteBuffer message = BufferUtil.toBuffer("hello world");
ByteBuffer sendPayload = BufferUtil.copy(message);
clientHandler.sendFrame(new Frame(OpCode.BINARY, sendPayload), Callback.NOOP, false);
Frame frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(frame.getOpCode(), is(OpCode.BINARY));
assertThat(frame.getPayload(), is(message));
assertThat(sendPayload, is(message));
clientHandler.sendClose();
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
}
@Test
public void testSendSameFrameMultipleTimes() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
client.connect(clientHandler, server.getUri()).get(5, TimeUnit.SECONDS);
serverHandler.open.await(5, TimeUnit.SECONDS);
clientHandler.coreSession.setAutoFragment(false);
serverHandler.coreSession.setAutoFragment(false);
int payloadLen = 32 * 1024;
byte[] array = new byte[payloadLen];
new Random().nextBytes(array);
ByteBuffer message = ByteBuffer.wrap(array);
Frame frame = new Frame(OpCode.BINARY, BufferUtil.copy(message));
for (int i = 0; i < 200; i++)
{
clientHandler.sendFrame(frame, Callback.NOOP, false);
Frame recvFrame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(recvFrame.getOpCode(), is(OpCode.BINARY));
assertThat(recvFrame.getPayload(), is(message));
assertThat(frame.getPayload(), is(message));
}
clientHandler.sendClose();
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
}
}

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.stream.Stream;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
@ -71,8 +72,8 @@ public class GeneratorFrameFlagsTest
{
setup(invalidFrame);
ByteBuffer buffer = ByteBuffer.allocate(100);
new Generator(components.getBufferPool()).generateWholeFrame(invalidFrame, buffer);
ByteBuffer buffer = BufferUtil.allocate(100);
new Generator().generateWholeFrame(invalidFrame, buffer);
assertThrows(ProtocolException.class, () -> coreSession.assertValidOutgoing(invalidFrame));
}
}

View File

@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.junit.jupiter.api.Test;
@ -32,14 +31,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GeneratorParserRoundtripTest
public class GeneratorParserRoundTripTest
{
private ByteBufferPool bufferPool = new MappedByteBufferPool();
@Test
public void testParserAndGenerator() throws Exception
{
Generator gen = new Generator(bufferPool);
Generator gen = new Generator();
ParserCapture capture = new ParserCapture();
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
@ -48,15 +47,11 @@ public class GeneratorParserRoundtripTest
try
{
// Generate Buffer
BufferUtil.flipToFill(out);
Frame frame = new Frame(OpCode.TEXT).setPayload(message);
ByteBuffer header = gen.generateHeaderBytes(frame);
ByteBuffer payload = frame.getPayload();
out.put(header);
out.put(payload);
gen.generateHeader(frame, out);
gen.generatePayload(frame, out);
// Parse Buffer
BufferUtil.flipToFlush(out, 0);
capture.parse(out);
}
finally
@ -72,13 +67,12 @@ public class GeneratorParserRoundtripTest
@Test
public void testParserAndGeneratorMasked() throws Exception
{
Generator gen = new Generator(bufferPool);
Generator gen = new Generator();
ParserCapture capture = new ParserCapture(true, Behavior.SERVER);
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
ByteBuffer out = bufferPool.acquire(8192, false);
BufferUtil.flipToFill(out);
try
{
// Setup Frame
@ -90,13 +84,10 @@ public class GeneratorParserRoundtripTest
frame.setMask(mask);
// Generate Buffer
ByteBuffer header = gen.generateHeaderBytes(frame);
ByteBuffer payload = frame.getPayload();
out.put(header);
out.put(payload);
gen.generateHeader(frame, out);
gen.generatePayload(frame, out);
// Parse Buffer
BufferUtil.flipToFlush(out, 0);
capture.parse(out);
}
finally

View File

@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.ByteBufferAssert;
import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.BufferUtil;
@ -46,7 +45,7 @@ public class GeneratorTest
{
private static final Logger LOG = Log.getLogger(Helper.class);
private static Generator generator = new Generator(new MappedByteBufferPool());
private static Generator generator = new Generator();
private static WebSocketCoreSession coreSession = newWebSocketCoreSession(Behavior.SERVER);
private static WebSocketCoreSession newWebSocketCoreSession(Behavior behavior)
@ -1270,26 +1269,26 @@ public class GeneratorTest
completeBufSize += Generator.MAX_HEADER_LENGTH + f.getPayloadLength();
}
ByteBuffer completeBuf = ByteBuffer.allocate(completeBufSize);
BufferUtil.clearToFill(completeBuf);
ByteBuffer completeBuf = BufferUtil.allocate(completeBufSize);
// Generate from all frames
for (Frame f : frames)
{
ByteBuffer header = generator.generateHeaderBytes(f);
totalBytes += BufferUtil.put(header, completeBuf);
int remaining = completeBuf.remaining();
generator.generateHeader(f, completeBuf);
totalBytes += completeBuf.remaining() - remaining;
if (f.hasPayload())
remaining = completeBuf.remaining();
generator.generatePayload(f, completeBuf);
if (completeBuf.remaining() - remaining > 0)
{
ByteBuffer payload = f.getPayload();
totalBytes += payload.remaining();
totalBytes += completeBuf.remaining() - remaining;
totalParts++;
completeBuf.put(payload.slice());
}
}
// Return results
BufferUtil.flipToFlush(completeBuf, 0);
return completeBuf;
}
}
@ -1323,9 +1322,8 @@ public class GeneratorTest
private static ByteBuffer generate(Frame... frames)
{
int length = Arrays.stream(frames).mapToInt(frame -> frame.getPayloadLength() + Generator.MAX_HEADER_LENGTH).sum();
ByteBuffer buffer = ByteBuffer.allocate(length);
ByteBuffer buffer = BufferUtil.allocate(length);
Arrays.stream(frames).forEach(frame -> generator.generateWholeFrame(frame, buffer));
BufferUtil.flipToFlush(buffer, 0);
return buffer;
}
}

View File

@ -62,9 +62,8 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
ByteBuffer buf = BufferUtil.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
generator.generateWholeFrame(frame, buf);
BufferUtil.flipToFlush(buf, 0);
captured.add(buf);
if (callback != null)
{

View File

@ -23,8 +23,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.internal.Generator;
@ -40,19 +38,15 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
*/
public class ParserReservedBitTest
{
private ByteBufferPool bufferPool = new MappedByteBufferPool();
private void expectProtocolException(List<Frame> frames)
{
ParserCapture parserCapture = new ParserCapture();
// generate raw bytebuffer of provided frames
int size = frames.stream().mapToInt(frame -> frame.getPayloadLength() + Generator.MAX_HEADER_LENGTH).sum();
Generator generator = new Generator();
ByteBuffer raw = BufferUtil.allocate(size);
BufferUtil.clearToFill(raw);
Generator generator = new Generator(bufferPool);
frames.forEach(frame -> generator.generateWholeFrame(frame, raw));
BufferUtil.flipToFlush(raw, 0);
// parse buffer
try (StacklessLogging ignore = new StacklessLogging(Parser.class))

View File

@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@ -50,6 +49,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ParserTest
{
private static final int MAX_ALLOWED_FRAME_SIZE = 4 * 1024 * 1024;
private static final byte[] mask = {0x00, (byte)0xF0, 0x0F, (byte)0xFF};
public static void putPayload(ByteBuffer buffer, byte[] payload)
{
int len = payload.length;
for (int i = 0; i < len; i++)
{
buffer.put((byte)(payload[i] ^ mask[i % 4]));
}
}
private ParserCapture parse(Behavior behavior, int maxAllowedFrameSize, ByteBuffer buffer)
{
@ -83,8 +92,8 @@ public class ParserTest
buffer.put(b);
if (masked)
{
Generator.putMask(buffer);
Generator.putPayload(buffer, messageBytes);
buffer.put(mask);
putPayload(buffer, messageBytes);
}
else
{
@ -1296,8 +1305,8 @@ public class ParserTest
buf.put((byte)0x81); // text frame, fin = true
buf.put((byte)(0x80 | 0x7E)); // 0x7E == 126 (a 2 byte payload length)
buf.putShort((short)utf.length);
Generator.putMask(buf);
Generator.putPayload(buf, utf);
buf.put(mask);
putPayload(buf, utf);
buf.flip();
ParserCapture capture = new ParserCapture(true, Behavior.SERVER);
@ -1325,8 +1334,8 @@ public class ParserTest
buf.put((byte)0x81); // text frame, fin = true
buf.put((byte)(0x80 | 0x7F)); // 0x7F == 127 (a 8 byte payload length)
buf.putLong(utf.length);
Generator.putMask(buf);
Generator.putPayload(buf, utf);
buf.put(mask);
putPayload(buf, utf);
buf.flip();
ParserCapture capture = parse(Behavior.SERVER, 100000, buf, true);
@ -1397,8 +1406,8 @@ public class ParserTest
buf.put((byte)0x81);
buf.put((byte)(0x80 | 0x7E)); // 0x7E == 126 (a 2 byte payload length)
buf.putShort((short)utf.length);
Generator.putMask(buf);
Generator.putPayload(buf, utf);
buf.put(mask);
putPayload(buf, utf);
buf.flip();
ParserCapture capture = parse(Behavior.SERVER, MAX_ALLOWED_FRAME_SIZE, buf, true);
@ -1417,8 +1426,8 @@ public class ParserTest
ByteBuffer buf = ByteBuffer.allocate(24);
buf.put((byte)0x81);
buf.put((byte)(0x80 | utf.length));
Generator.putMask(buf);
Generator.putPayload(buf, utf);
buf.put(mask);
putPayload(buf, utf);
buf.flip();
ParserCapture capture = parse(Behavior.SERVER, MAX_ALLOWED_FRAME_SIZE, buf, true);
@ -1442,14 +1451,14 @@ public class ParserTest
// part 1
buf.put((byte)0x01); // no fin + text
buf.put((byte)(0x80 | b1.length));
Generator.putMask(buf);
Generator.putPayload(buf, b1);
buf.put(mask);
putPayload(buf, b1);
// part 2
buf.put((byte)0x80); // fin + continuation
buf.put((byte)(0x80 | b2.length));
Generator.putMask(buf);
Generator.putPayload(buf, b2);
buf.put(mask);
putPayload(buf, b2);
buf.flip();
@ -1473,8 +1482,8 @@ public class ParserTest
ByteBuffer buf = ByteBuffer.allocate(24);
buf.put((byte)0x81);
buf.put((byte)(0x80 | utf.length));
Generator.putMask(buf);
Generator.putPayload(buf, utf);
buf.put(mask);
putPayload(buf, utf);
buf.flip();
ParserCapture capture = parse(Behavior.SERVER, MAX_ALLOWED_FRAME_SIZE, buf, true);
@ -1650,13 +1659,12 @@ public class ParserTest
private ByteBuffer generate(Behavior behavior, List<Frame> frames)
{
Generator generator = new Generator(new MappedByteBufferPool());
Generator generator = new Generator();
int length = frames.stream().mapToInt(frame -> frame.getPayloadLength() + Generator.MAX_HEADER_LENGTH).sum();
ByteBuffer buffer = ByteBuffer.allocate(length);
ByteBuffer buffer = BufferUtil.allocate(length);
frames.stream()
.peek(frame -> maskIfClient(behavior, frame))
.forEach(frame -> generator.generateWholeFrame(frame, buffer));
BufferUtil.flipToFlush(buffer, 0);
return buffer;
}

View File

@ -20,14 +20,10 @@ package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.core.internal.Generator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@ -36,50 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class WebSocketFrameTest
{
public LeakTrackingByteBufferPool bufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
@AfterEach
public void afterEach()
{
String id = WebSocketFrameTest.class.getSimpleName();
assertThat("Leaked Acquires Count for [" + id + "]", bufferPool.getLeakedAcquires(), is(0L));
assertThat("Leaked Releases Count for [" + id + "]", bufferPool.getLeakedReleases(), is(0L));
assertThat("Leaked Resource Count for [" + id + "]", bufferPool.getLeakedResources(), is(0L));
}
private Generator generator;
private Generator generator = new Generator();
private ByteBuffer generateWholeFrame(Generator generator, Frame frame)
{
ByteBuffer buf = ByteBuffer.allocate(frame.getPayloadLength() + Generator.MAX_HEADER_LENGTH);
ByteBuffer buf = BufferUtil.allocate(frame.getPayloadLength() + Generator.MAX_HEADER_LENGTH);
generator.generateWholeFrame(frame, buf);
BufferUtil.flipToFlush(buf, 0);
return buf;
}
@BeforeEach
public void initGenerator()
{
generator = new Generator(bufferPool);
}
@AfterEach
public void verifyNoLeaks()
{
//TODO is this right and should it be other way around
bufferPool.clearTracking();
assertNoLeaks(bufferPool);
}
public void assertNoLeaks(LeakTrackingByteBufferPool pool)
{
String id = this.getClass().getName();
assertThat("Leaked Acquires Count for [" + id + "]", pool.getLeakedAcquires(), is(0L));
assertThat("Leaked Releases Count for [" + id + "]", pool.getLeakedReleases(), is(0L));
assertThat("Leaked Resource Count for [" + id + "]", pool.getLeakedResources(), is(0L));
}
private void assertFrameHex(String message, String expectedHex, ByteBuffer actual)
{
String actualHex = Hex.asHex(actual);

View File

@ -79,7 +79,7 @@ public class FrameFlusherTest
@Test
public void testPostCloseFrameCallbacks() throws ExecutionException, InterruptedException, TimeoutException
{
Generator generator = new Generator(bufferPool);
Generator generator = new Generator();
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 1;
@ -110,7 +110,7 @@ public class FrameFlusherTest
@Test
public void testLargeSmallText() throws ExecutionException, InterruptedException
{
Generator generator = new Generator(bufferPool);
Generator generator = new Generator();
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 8;
@ -161,7 +161,7 @@ public class FrameFlusherTest
@Test
public void testWriteTimeout() throws Exception
{
Generator generator = new Generator(bufferPool);
Generator generator = new Generator();
BlockingEndpoint endPoint = new BlockingEndpoint(bufferPool);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 8;
@ -193,7 +193,7 @@ public class FrameFlusherTest
@Test
public void testErrorClose() throws Exception
{
Generator generator = new Generator(bufferPool);
Generator generator = new Generator();
BlockingEndpoint endPoint = new BlockingEndpoint(bufferPool);
endPoint.setBlockTime(100);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;