Issue #4152 - make WS Parser autoFragment data frames to maxFrameSize (#4219)

* Issue #4152 - make WS Parser autoFragment data frames to maxFrameSize

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Issue #4152 - fix broken tests

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Issue #4152 - fix broken tests

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* changes from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan 2019-10-22 16:17:50 +11:00 committed by GitHub
parent c2cde806d1
commit f6f423f558
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 227 additions and 36 deletions

View File

@ -256,12 +256,17 @@ public class Parser
protected void checkFrameSize(byte opcode, int payloadLength) throws MessageTooLargeException, ProtocolException
{
if (OpCode.isControlFrame(opcode) && payloadLength > Frame.MAX_CONTROL_PAYLOAD)
throw new ProtocolException("Invalid control frame payload length, [" + payloadLength + "] cannot exceed [" + Frame.MAX_CONTROL_PAYLOAD + "]");
long maxFrameSize = configuration.getMaxFrameSize();
if (!configuration.isAutoFragment() && maxFrameSize > 0 && payloadLength > maxFrameSize)
throw new MessageTooLargeException("Cannot handle payload lengths larger than " + maxFrameSize);
if (OpCode.isControlFrame(opcode))
{
if (payloadLength > Frame.MAX_CONTROL_PAYLOAD)
throw new ProtocolException("Invalid control frame payload length, [" + payloadLength + "] cannot exceed [" + Frame.MAX_CONTROL_PAYLOAD + "]");
}
else
{
long maxFrameSize = configuration.getMaxFrameSize();
if (!configuration.isAutoFragment() && maxFrameSize > 0 && payloadLength > maxFrameSize)
throw new MessageTooLargeException("Cannot handle payload lengths larger than " + maxFrameSize);
}
}
protected ParsedFrame newFrame(byte firstByte, byte[] mask, ByteBuffer payload, boolean releaseable)
@ -279,6 +284,32 @@ public class Parser
return new ParsedFrame(firstByte, mask, payload, releaseable);
}
private ParsedFrame autoFragment(ByteBuffer buffer, int fragmentSize)
{
payloadLength -= fragmentSize;
byte[] nextMask = null;
if (mask != null)
{
int shift = fragmentSize % 4;
nextMask = new byte[4];
nextMask[0] = mask[(0 + shift) % 4];
nextMask[1] = mask[(1 + shift) % 4];
nextMask[2] = mask[(2 + shift) % 4];
nextMask[3] = mask[(3 + shift) % 4];
}
ByteBuffer content = buffer.slice();
content.limit(fragmentSize);
buffer.position(buffer.position() + fragmentSize);
final ParsedFrame frame = newFrame((byte)(firstByte & 0x7F), mask, content, false);
mask = nextMask;
firstByte = (byte)((firstByte & 0x80) | OpCode.CONTINUATION);
state = State.FRAGMENT;
return frame;
}
private ParsedFrame parsePayload(ByteBuffer buffer)
{
if (payloadLength == 0)
@ -288,35 +319,21 @@ public class Parser
return null;
int available = buffer.remaining();
boolean isDataFrame = OpCode.isDataFrame(OpCode.getOpCode(firstByte));
// Always autoFragment data frames if payloadLength is greater than maxFrameSize.
long maxFrameSize = configuration.getMaxFrameSize();
if (maxFrameSize > 0 && isDataFrame && payloadLength > maxFrameSize)
return autoFragment(buffer, (int)Math.min(available, maxFrameSize));
if (aggregate == null)
{
if (available < payloadLength)
{
// not enough to complete this frame
// not enough to complete this frame
// Can we auto-fragment
if (configuration.isAutoFragment() && OpCode.isDataFrame(OpCode.getOpCode(firstByte)))
{
payloadLength -= available;
byte[] nextMask = null;
if (mask != null)
{
int shift = available % 4;
nextMask = new byte[4];
nextMask[0] = mask[(0 + shift) % 4];
nextMask[1] = mask[(1 + shift) % 4];
nextMask[2] = mask[(2 + shift) % 4];
nextMask[3] = mask[(3 + shift) % 4];
}
final ParsedFrame frame = newFrame((byte)(firstByte & 0x7F), mask, buffer.slice(), false);
buffer.position(buffer.limit());
mask = nextMask;
firstByte = (byte)((firstByte & 0x80) | OpCode.CONTINUATION);
state = State.FRAGMENT;
return frame;
}
if (configuration.isAutoFragment() && isDataFrame)
return autoFragment(buffer, available);
// No space in the buffer, so we have to copy the partial payload
aggregate = bufferPool.acquire(payloadLength, false);

View File

@ -0,0 +1,164 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AutoFragmentTest
{
private WebSocketServer server;
private TestFrameHandler serverHandler;
private URI serverUri;
private WebSocketCoreClient client;
@BeforeEach
public void setup() throws Exception
{
serverHandler = new TestFrameHandler();
server = new WebSocketServer(serverHandler);
server.start();
serverUri = new URI("ws://localhost:" + server.getLocalPort());
client = new WebSocketCoreClient();
client.start();
}
@AfterEach
public void stop() throws Exception
{
client.stop();
server.stop();
}
@Test
public void testAutoFragmentToMaxFrameSize() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, serverUri);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the client.
clientHandler.coreSession.setMaxFrameSize(0);
clientHandler.coreSession.setAutoFragment(false);
// Set the server should fragment to the maxFrameSize.
int maxFrameSize = 30;
assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS));
serverHandler.coreSession.setMaxFrameSize(maxFrameSize);
serverHandler.coreSession.setAutoFragment(true);
// Send a message which is too large.
int size = maxFrameSize * 2;
byte[] message = new byte[size];
Arrays.fill(message, 0, size, (byte)'X');
clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.toBuffer(message)), Callback.NOOP, false);
// We should not receive any frames larger than the max frame size.
// So our message should be split into two frames.
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.BINARY));
assertThat(frame.getPayloadLength(), is(maxFrameSize));
assertThat(frame.isFin(), is(false));
// Second frame should be final and contain rest of the data.
frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CONTINUATION));
assertThat(frame.getPayloadLength(), is(maxFrameSize));
assertThat(frame.isFin(), is(true));
clientHandler.sendClose();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
}
@Disabled("permessage-deflate autoFragment not implemented yet")
@Test
public void testAutoFragmentWithPermessageDeflate() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(client, serverUri, clientHandler);
upgradeRequest.addExtensions("permessage-deflate");
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(upgradeRequest);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the client.
clientHandler.coreSession.setMaxFrameSize(0);
clientHandler.coreSession.setAutoFragment(false);
// Set a small maxFrameSize on the server.
int maxFrameSize = 10;
assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS));
serverHandler.coreSession.setMaxFrameSize(maxFrameSize);
serverHandler.coreSession.setAutoFragment(true);
// Generate a large random payload.
int payloadSize = 1000;
Random rand = new Random();
ByteBuffer payload = BufferUtil.allocate(payloadSize);
BufferUtil.clearToFill(payload);
for (int i=0; i<payloadSize; i++)
payload.put((byte)rand.nextInt(Byte.MAX_VALUE));
BufferUtil.flipToFlush(payload, 0);
// Send the large random payload which should be fragmented on the server.
clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.copy(payload)), Callback.NOOP, false);
// Assemble the message from the fragmented frames.
ByteBuffer message = BufferUtil.allocate(payloadSize*2);
Frame frame = serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS);
while (frame != null)
{
int framePayloadLen = frame.getPayloadLength();
int append = BufferUtil.append(message, frame.getPayload());
assertThat(framePayloadLen, lessThanOrEqualTo(maxFrameSize));
assertThat(append, is(framePayloadLen));
frame = serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS);
}
assertThat(message, is(payload));
clientHandler.sendClose();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
}
}

View File

@ -57,6 +57,8 @@ public class ParserCapture
ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER);
exStack.negotiate(new LinkedList<>(), new LinkedList<>());
this.coreSession = new WebSocketCoreSession(new TestMessageHandler(), behavior, Negotiated.from(exStack));
coreSession.setAutoFragment(false);
coreSession.setMaxFrameSize(0);
this.parser = new Parser(components.getBufferPool(), coreSession);
}

View File

@ -1542,6 +1542,7 @@ public class ParserTest
ByteBuffer buffer = BufferUtil.allocate(32);
ParserCapture capture = new ParserCapture(false, Behavior.SERVER);
capture.getCoreSession().setAutoFragment(true);
data.limit(6 + 5);
BufferUtil.append(buffer, data);

View File

@ -57,7 +57,8 @@ public class TestFrameHandler implements SynchronousFrameHandler
@Override
public void onOpen(CoreSession coreSession)
{
LOG.info("onOpen {}", coreSession);
if (LOG.isDebugEnabled())
LOG.debug("onOpen {}", coreSession);
this.coreSession = coreSession;
open.countDown();
}
@ -65,41 +66,47 @@ public class TestFrameHandler implements SynchronousFrameHandler
@Override
public void onFrame(Frame frame)
{
LOG.info("onFrame: " + OpCode.name(frame.getOpCode()) + ":" + BufferUtil.toDetailString(frame.getPayload()));
if (LOG.isDebugEnabled())
LOG.debug("onFrame: " + OpCode.name(frame.getOpCode()) + ":" + BufferUtil.toDetailString(frame.getPayload()));
receivedFrames.offer(Frame.copy(frame));
}
@Override
public void onClosed(CloseStatus closeStatus)
{
LOG.info("onClosed {}", closeStatus);
if (LOG.isDebugEnabled())
LOG.debug("onClosed {}", closeStatus);
closed.countDown();
}
@Override
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null ? null : cause.toString());
if (LOG.isDebugEnabled())
LOG.debug("onError {} ", cause == null ? null : cause.toString());
failure = cause;
error.countDown();
}
public void sendText(String text)
{
LOG.info("sendText {} ", text);
if (LOG.isDebugEnabled())
LOG.debug("sendText {} ", text);
Frame frame = new Frame(OpCode.TEXT, text);
getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
public void sendFrame(Frame frame)
{
LOG.info("sendFrame {} ", frame);
if (LOG.isDebugEnabled())
LOG.debug("sendFrame {} ", frame);
getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
public void sendClose()
{
LOG.info("sendClose");
if (LOG.isDebugEnabled())
LOG.debug("sendClose");
Frame frame = new Frame(OpCode.CLOSE);
getCoreSession().sendFrame(frame, Callback.NOOP, false);
}