Issue #4152 - fragment outgoing frames before the ExtensionStack (#4232)

* Added FragmentingFlusher to abstract the fragmentation of frames.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Handle failure case of Fragmenting Flusher

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Fragment outgoing frames before the ExtensionStack

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* update javadoc

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* separate frame size validation for incoming and outgoing frames

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* fix test

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* reimplement FragmentingFlusher with the new TransformingFlusher

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan 2019-11-26 14:51:40 +11:00 committed by GitHub
parent 2dbd19f902
commit efe76ff2e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 202 additions and 225 deletions

View File

@ -18,18 +18,13 @@
package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.AbstractExtension;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
/**
@ -39,9 +34,20 @@ public class FragmentExtension extends AbstractExtension
{
private static final Logger LOG = Log.getLogger(FragmentExtension.class);
private final Queue<FrameEntry> entries = new ArrayDeque<>();
private final IteratingCallback flusher = new Flusher();
private int maxLength;
private final FragmentingFlusher flusher;
private final FrameHandler.Configuration configuration = new FrameHandler.ConfigurationHolder();
public FragmentExtension()
{
flusher = new FragmentingFlusher(configuration)
{
@Override
void forwardFrame(Frame frame, Callback callback, boolean batch)
{
nextOutgoingFrame(frame, callback, batch);
}
};
}
@Override
public String getName()
@ -58,154 +64,14 @@ public class FragmentExtension extends AbstractExtension
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
ByteBuffer payload = frame.getPayload();
int length = payload != null ? payload.remaining() : 0;
if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength)
{
nextOutgoingFrame(frame, callback, batch);
return;
}
FrameEntry entry = new FrameEntry(frame, callback, batch);
if (LOG.isDebugEnabled())
LOG.debug("Queuing {}", entry);
offerEntry(entry);
flusher.iterate();
flusher.sendFrame(frame, callback, batch);
}
@Override
public void init(ExtensionConfig config, WebSocketComponents components)
{
super.init(config, components);
maxLength = config.getParameter("maxLength", -1);
}
private void offerEntry(FrameEntry entry)
{
synchronized (this)
{
entries.offer(entry);
}
}
private FrameEntry pollEntry()
{
synchronized (this)
{
return entries.poll();
}
}
private class Flusher extends IteratingCallback implements Callback
{
private FrameEntry current;
private boolean finished = true;
@Override
protected Action process() throws Exception
{
if (finished)
{
current = pollEntry();
LOG.debug("Processing {}", current);
if (current == null)
return Action.IDLE;
fragment(current, true);
}
else
{
fragment(current, false);
}
return Action.SCHEDULED;
}
private void fragment(FrameEntry entry, boolean first)
{
Frame frame = entry.frame;
ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
int length = Math.min(remaining, maxLength);
finished = length == remaining;
boolean continuation = (frame.getOpCode() == OpCode.CONTINUATION) || !first;
Frame fragment = new Frame(continuation ? OpCode.CONTINUATION : frame.getOpCode());
boolean fin = frame.isFin() && finished;
fragment.setFin(fin);
int limit = payload.limit();
int newLimit = payload.position() + length;
payload.limit(newLimit);
ByteBuffer payloadFragment = payload.slice();
payload.limit(limit);
fragment.setPayload(payloadFragment);
if (LOG.isDebugEnabled())
LOG.debug("Fragmented {}->{}", frame, fragment);
payload.position(newLimit);
nextOutgoingFrame(fragment, this, entry.batch);
}
@Override
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
}
@Override
protected void onCompleteFailure(Throwable x)
{
// This IteratingCallback never fails.
// The callback are those provided by WriteCallback (implemented
// below) and even in case of writeFailed() we call succeeded().
}
@Override
public void succeeded()
{
// Notify first then call succeeded(), otherwise
// write callbacks may be invoked out of order.
notifyCallbackSuccess(current.callback);
super.succeeded();
}
@Override
public void failed(Throwable cause)
{
// Notify first, the call succeeded() to drain the queue.
// We don't want to call failed(x) because that will put
// this flusher into a final state that cannot be exited,
// and the failure of a frame may not mean that the whole
// connection is now invalid.
notifyCallbackFailure(current.callback, cause);
succeeded();
}
private void notifyCallbackSuccess(Callback callback)
{
try
{
if (callback != null)
callback.succeeded();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
private void notifyCallbackFailure(Callback callback, Throwable failure)
{
try
{
if (callback != null)
callback.failed(failure);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
int maxLength = config.getParameter("maxLength", -1);
configuration.setMaxFrameSize(maxLength);
}
}

View File

@ -0,0 +1,109 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler.Configuration;
import org.eclipse.jetty.websocket.core.OpCode;
/**
* Used to split large data frames into multiple frames below the maxFrameSize.
* Control frames and dataFrames smaller than the maxFrameSize will be forwarded
* directly to {@link #forwardFrame(Frame, Callback, boolean)}.
*/
public abstract class FragmentingFlusher extends TransformingFlusher
{
private static final Logger LOG = Log.getLogger(FragmentingFlusher.class);
private final Configuration configuration;
private FrameEntry current;
public FragmentingFlusher(Configuration configuration)
{
this.configuration = configuration;
}
abstract void forwardFrame(Frame frame, Callback callback, boolean batch);
@Override
protected boolean onFrame(Frame frame, Callback callback, boolean batch)
{
long maxFrameSize = configuration.getMaxFrameSize();
if (frame.isControlFrame() || maxFrameSize <= 0 || frame.getPayloadLength() <= maxFrameSize)
{
forwardFrame(frame, callback, batch);
return true;
}
current = new FrameEntry(frame, callback, batch);
boolean finished = fragment(callback, true);
if (finished)
current = null;
return finished;
}
@Override
protected boolean transform(Callback callback)
{
boolean finished = fragment(callback, false);
if (finished)
current = null;
return finished;
}
private boolean fragment(Callback callback, boolean first)
{
Frame frame = current.frame;
ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
long maxFrameSize = configuration.getMaxFrameSize();
int fragmentSize = (int)Math.min(remaining, maxFrameSize);
boolean continuation = (frame.getOpCode() == OpCode.CONTINUATION) || !first;
Frame fragment = new Frame(continuation ? OpCode.CONTINUATION : frame.getOpCode());
boolean finished = (maxFrameSize <= 0 || remaining <= maxFrameSize);
fragment.setFin(frame.isFin() && finished);
// If we don't need to fragment just forward with original payload.
if (finished)
{
fragment.setPayload(frame.getPayload());
forwardFrame(fragment, callback, current.batch);
return true;
}
// Slice the fragmented payload from the buffer.
int limit = payload.limit();
int newLimit = payload.position() + fragmentSize;
payload.limit(newLimit);
ByteBuffer payloadFragment = payload.slice();
payload.limit(limit);
fragment.setPayload(payloadFragment);
payload.position(newLimit);
if (LOG.isDebugEnabled())
LOG.debug("Fragmented {}->{}", frame, fragment);
forwardFrame(fragment, callback, current.batch);
return false;
}
}

View File

@ -23,16 +23,13 @@ import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
@ -44,6 +41,7 @@ import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.ProtocolException;
@ -67,7 +65,7 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
private final FrameHandler handler;
private final Negotiated negotiated;
private final boolean demanding;
private final Flusher flusher = new Flusher();
private final Flusher flusher = new Flusher(this);
private WebSocketConnection connection;
private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT;
@ -100,6 +98,10 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
{
assertValidFrame(frame);
// Validate frame size.
if (maxFrameSize > 0 && frame.getPayloadLength() > maxFrameSize)
throw new MessageTooLargeException("Cannot handle payload lengths larger than " + maxFrameSize);
// Assert Incoming Frame Behavior Required by RFC-6455 / Section 5.1
switch (behavior)
{
@ -132,6 +134,10 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
{
assertValidFrame(frame);
// Validate frame size (allowed to be over max frame size if autoFragment is true).
if (!autoFragment && maxFrameSize > 0 && frame.getPayloadLength() > maxFrameSize)
throw new MessageTooLargeException("Cannot handle payload lengths larger than " + maxFrameSize);
/*
* RFC 6455 Section 5.5.1
* close frame payload is specially formatted which is checked in CloseStatus
@ -155,7 +161,7 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
if (!OpCode.isKnown(frame.getOpCode()))
throw new ProtocolException("Unknown opcode: " + frame.getOpCode());
int payloadLength = (frame.getPayload() == null) ? 0 : frame.getPayload().remaining();
int payloadLength = frame.getPayloadLength();
if (frame.isControlFrame())
{
if (!frame.isFin())
@ -520,26 +526,22 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
try
{
synchronized (flusher)
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
boolean closeConnection = sessionState.onOutgoingFrame(frame);
if (closeConnection)
{
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
Callback closeConnectionCallback = Callback.from(
() -> closeConnection(sessionState.getCloseStatus(), callback),
t -> closeConnection(sessionState.getCloseStatus(), Callback.from(callback, t)));
boolean closeConnection = sessionState.onOutgoingFrame(frame);
if (closeConnection)
{
Callback closeConnectionCallback = Callback.from(
() -> closeConnection(sessionState.getCloseStatus(), callback),
t -> closeConnection(sessionState.getCloseStatus(), Callback.from(callback, t)));
flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false));
}
else
{
flusher.queue.offer(new FrameEntry(frame, callback, batch));
}
flusher.sendFrame(frame, closeConnectionCallback, false);
}
else
{
flusher.sendFrame(frame, callback, batch);
}
flusher.iterate();
}
catch (Throwable t)
{
@ -562,11 +564,7 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
@Override
public void flush(Callback callback)
{
synchronized (flusher)
{
flusher.queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false));
}
flusher.iterate();
flusher.sendFrame(FrameFlusher.FLUSH_FRAME, callback, false);
}
@Override
@ -791,57 +789,17 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe
handler);
}
private class Flusher extends IteratingCallback
private class Flusher extends FragmentingFlusher
{
private final Queue<FrameEntry> queue = new ArrayDeque<>();
FrameEntry entry;
@Override
protected Action process() throws Throwable
public Flusher(FrameHandler.Configuration configuration)
{
synchronized (this)
{
entry = queue.poll();
}
if (entry == null)
return Action.IDLE;
negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch);
return Action.SCHEDULED;
super(configuration);
}
@Override
public void succeeded()
void forwardFrame(Frame frame, Callback callback, boolean batch)
{
entry.callback.succeeded();
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
entry.callback.failed(cause);
Queue<FrameEntry> entries;
synchronized (this)
{
entries = new ArrayDeque<>(queue);
queue.clear();
}
entries.forEach(e -> failEntry(cause, e));
}
private void failEntry(Throwable cause, FrameEntry e)
{
try
{
e.callback.failed(cause);
}
catch (Throwable x)
{
if (cause != x)
cause.addSuppressed(x);
LOG.warn(cause);
}
negotiated.getExtensions().sendFrame(frame, callback, batch);
}
}
}

View File

@ -71,7 +71,50 @@ public class AutoFragmentTest
}
@Test
public void testAutoFragmentToMaxFrameSize() throws Exception
public void testOutgoingAutoFragmentToMaxFrameSize() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, serverUri);
connect.get(5, TimeUnit.SECONDS);
// Turn off fragmentation on the server.
assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS));
serverHandler.coreSession.setMaxFrameSize(0);
serverHandler.coreSession.setAutoFragment(false);
// Set the client to fragment to the maxFrameSize.
int maxFrameSize = 30;
clientHandler.coreSession.setMaxFrameSize(maxFrameSize);
clientHandler.coreSession.setAutoFragment(true);
// Send a message which is too large.
int size = maxFrameSize * 2;
byte[] message = new byte[size];
Arrays.fill(message, 0, size, (byte)'X');
clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.toBuffer(message)), Callback.NOOP, false);
// We should not receive any frames larger than the max frame size.
// So our message should be split into two frames.
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.BINARY));
assertThat(frame.getPayloadLength(), is(maxFrameSize));
assertThat(frame.isFin(), is(false));
// Second frame should be final and contain rest of the data.
frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CONTINUATION));
assertThat(frame.getPayloadLength(), is(maxFrameSize));
assertThat(frame.isFin(), is(true));
clientHandler.sendClose();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
}
@Test
public void testIncomingAutoFragmentToMaxFrameSize() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, serverUri);

View File

@ -62,6 +62,7 @@ public class ParsePayloadLengthTest
public void testPayloadLength(int size, String description) throws InterruptedException
{
ParserCapture capture = new ParserCapture();
capture.getCoreSession().setMaxFrameSize(0);
ByteBuffer raw = BufferUtil.allocate(size + Generator.MAX_HEADER_LENGTH);
BufferUtil.clearToFill(raw);