From efe76ff2e0816f79fdaf0bd0fb444f466625f40d Mon Sep 17 00:00:00 2001 From: Lachlan Date: Tue, 26 Nov 2019 14:51:40 +1100 Subject: [PATCH] Issue #4152 - fragment outgoing frames before the ExtensionStack (#4232) * Added FragmentingFlusher to abstract the fragmentation of frames. Signed-off-by: Lachlan Roberts * Handle failure case of Fragmenting Flusher Signed-off-by: Lachlan Roberts * Fragment outgoing frames before the ExtensionStack Signed-off-by: Lachlan Roberts * update javadoc Signed-off-by: Lachlan Roberts * separate frame size validation for incoming and outgoing frames Signed-off-by: Lachlan Roberts * fix test Signed-off-by: Lachlan Roberts * reimplement FragmentingFlusher with the new TransformingFlusher Signed-off-by: Lachlan Roberts --- .../core/internal/FragmentExtension.java | 170 ++---------------- .../core/internal/FragmentingFlusher.java | 109 +++++++++++ .../core/internal/WebSocketCoreSession.java | 102 ++++------- .../websocket/core/AutoFragmentTest.java | 45 ++++- .../core/ParsePayloadLengthTest.java | 1 + 5 files changed, 202 insertions(+), 225 deletions(-) create mode 100644 jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentingFlusher.java diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java index cf2ed8cacbc..5f481d800d4 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java @@ -18,18 +18,13 @@ package org.eclipse.jetty.websocket.core.internal; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Queue; - import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.AbstractExtension; import org.eclipse.jetty.websocket.core.ExtensionConfig; import org.eclipse.jetty.websocket.core.Frame; -import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.WebSocketComponents; /** @@ -39,9 +34,20 @@ public class FragmentExtension extends AbstractExtension { private static final Logger LOG = Log.getLogger(FragmentExtension.class); - private final Queue entries = new ArrayDeque<>(); - private final IteratingCallback flusher = new Flusher(); - private int maxLength; + private final FragmentingFlusher flusher; + private final FrameHandler.Configuration configuration = new FrameHandler.ConfigurationHolder(); + + public FragmentExtension() + { + flusher = new FragmentingFlusher(configuration) + { + @Override + void forwardFrame(Frame frame, Callback callback, boolean batch) + { + nextOutgoingFrame(frame, callback, batch); + } + }; + } @Override public String getName() @@ -58,154 +64,14 @@ public class FragmentExtension extends AbstractExtension @Override public void sendFrame(Frame frame, Callback callback, boolean batch) { - ByteBuffer payload = frame.getPayload(); - int length = payload != null ? payload.remaining() : 0; - if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength) - { - nextOutgoingFrame(frame, callback, batch); - return; - } - - FrameEntry entry = new FrameEntry(frame, callback, batch); - if (LOG.isDebugEnabled()) - LOG.debug("Queuing {}", entry); - offerEntry(entry); - flusher.iterate(); + flusher.sendFrame(frame, callback, batch); } @Override public void init(ExtensionConfig config, WebSocketComponents components) { super.init(config, components); - maxLength = config.getParameter("maxLength", -1); - } - - private void offerEntry(FrameEntry entry) - { - synchronized (this) - { - entries.offer(entry); - } - } - - private FrameEntry pollEntry() - { - synchronized (this) - { - return entries.poll(); - } - } - - private class Flusher extends IteratingCallback implements Callback - { - private FrameEntry current; - private boolean finished = true; - - @Override - protected Action process() throws Exception - { - if (finished) - { - current = pollEntry(); - LOG.debug("Processing {}", current); - if (current == null) - return Action.IDLE; - fragment(current, true); - } - else - { - fragment(current, false); - } - return Action.SCHEDULED; - } - - private void fragment(FrameEntry entry, boolean first) - { - Frame frame = entry.frame; - ByteBuffer payload = frame.getPayload(); - int remaining = payload.remaining(); - int length = Math.min(remaining, maxLength); - finished = length == remaining; - - boolean continuation = (frame.getOpCode() == OpCode.CONTINUATION) || !first; - Frame fragment = new Frame(continuation ? OpCode.CONTINUATION : frame.getOpCode()); - boolean fin = frame.isFin() && finished; - fragment.setFin(fin); - - int limit = payload.limit(); - int newLimit = payload.position() + length; - payload.limit(newLimit); - ByteBuffer payloadFragment = payload.slice(); - payload.limit(limit); - fragment.setPayload(payloadFragment); - if (LOG.isDebugEnabled()) - LOG.debug("Fragmented {}->{}", frame, fragment); - payload.position(newLimit); - - nextOutgoingFrame(fragment, this, entry.batch); - } - - @Override - protected void onCompleteSuccess() - { - // This IteratingCallback never completes. - } - - @Override - protected void onCompleteFailure(Throwable x) - { - // This IteratingCallback never fails. - // The callback are those provided by WriteCallback (implemented - // below) and even in case of writeFailed() we call succeeded(). - } - - @Override - public void succeeded() - { - // Notify first then call succeeded(), otherwise - // write callbacks may be invoked out of order. - notifyCallbackSuccess(current.callback); - super.succeeded(); - } - - @Override - public void failed(Throwable cause) - { - // Notify first, the call succeeded() to drain the queue. - // We don't want to call failed(x) because that will put - // this flusher into a final state that cannot be exited, - // and the failure of a frame may not mean that the whole - // connection is now invalid. - notifyCallbackFailure(current.callback, cause); - succeeded(); - } - - private void notifyCallbackSuccess(Callback callback) - { - try - { - if (callback != null) - callback.succeeded(); - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Exception while notifying success of callback " + callback, x); - } - } - - private void notifyCallbackFailure(Callback callback, Throwable failure) - { - try - { - if (callback != null) - callback.failed(failure); - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Exception while notifying failure of callback " + callback, x); - } - } + int maxLength = config.getParameter("maxLength", -1); + configuration.setMaxFrameSize(maxLength); } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentingFlusher.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentingFlusher.java new file mode 100644 index 00000000000..a42745b318a --- /dev/null +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentingFlusher.java @@ -0,0 +1,109 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.core.internal; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler.Configuration; +import org.eclipse.jetty.websocket.core.OpCode; + +/** + * Used to split large data frames into multiple frames below the maxFrameSize. + * Control frames and dataFrames smaller than the maxFrameSize will be forwarded + * directly to {@link #forwardFrame(Frame, Callback, boolean)}. + */ +public abstract class FragmentingFlusher extends TransformingFlusher +{ + private static final Logger LOG = Log.getLogger(FragmentingFlusher.class); + private final Configuration configuration; + private FrameEntry current; + + public FragmentingFlusher(Configuration configuration) + { + this.configuration = configuration; + } + + abstract void forwardFrame(Frame frame, Callback callback, boolean batch); + + @Override + protected boolean onFrame(Frame frame, Callback callback, boolean batch) + { + long maxFrameSize = configuration.getMaxFrameSize(); + if (frame.isControlFrame() || maxFrameSize <= 0 || frame.getPayloadLength() <= maxFrameSize) + { + forwardFrame(frame, callback, batch); + return true; + } + + current = new FrameEntry(frame, callback, batch); + boolean finished = fragment(callback, true); + if (finished) + current = null; + return finished; + } + + @Override + protected boolean transform(Callback callback) + { + boolean finished = fragment(callback, false); + if (finished) + current = null; + return finished; + } + + private boolean fragment(Callback callback, boolean first) + { + Frame frame = current.frame; + ByteBuffer payload = frame.getPayload(); + int remaining = payload.remaining(); + long maxFrameSize = configuration.getMaxFrameSize(); + int fragmentSize = (int)Math.min(remaining, maxFrameSize); + + boolean continuation = (frame.getOpCode() == OpCode.CONTINUATION) || !first; + Frame fragment = new Frame(continuation ? OpCode.CONTINUATION : frame.getOpCode()); + boolean finished = (maxFrameSize <= 0 || remaining <= maxFrameSize); + fragment.setFin(frame.isFin() && finished); + + // If we don't need to fragment just forward with original payload. + if (finished) + { + fragment.setPayload(frame.getPayload()); + forwardFrame(fragment, callback, current.batch); + return true; + } + + // Slice the fragmented payload from the buffer. + int limit = payload.limit(); + int newLimit = payload.position() + fragmentSize; + payload.limit(newLimit); + ByteBuffer payloadFragment = payload.slice(); + payload.limit(limit); + fragment.setPayload(payloadFragment); + payload.position(newLimit); + if (LOG.isDebugEnabled()) + LOG.debug("Fragmented {}->{}", frame, fragment); + + forwardFrame(fragment, callback, current.batch); + return false; + } +} diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java index 0bf46245bb7..d10c3b3faab 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketCoreSession.java @@ -23,16 +23,13 @@ import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.URI; import java.time.Duration; -import java.util.ArrayDeque; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.Utf8Appendable; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; @@ -44,6 +41,7 @@ import org.eclipse.jetty.websocket.core.ExtensionConfig; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.IncomingFrames; +import org.eclipse.jetty.websocket.core.MessageTooLargeException; import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.OutgoingFrames; import org.eclipse.jetty.websocket.core.ProtocolException; @@ -67,7 +65,7 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe private final FrameHandler handler; private final Negotiated negotiated; private final boolean demanding; - private final Flusher flusher = new Flusher(); + private final Flusher flusher = new Flusher(this); private WebSocketConnection connection; private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT; @@ -100,6 +98,10 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe { assertValidFrame(frame); + // Validate frame size. + if (maxFrameSize > 0 && frame.getPayloadLength() > maxFrameSize) + throw new MessageTooLargeException("Cannot handle payload lengths larger than " + maxFrameSize); + // Assert Incoming Frame Behavior Required by RFC-6455 / Section 5.1 switch (behavior) { @@ -132,6 +134,10 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe { assertValidFrame(frame); + // Validate frame size (allowed to be over max frame size if autoFragment is true). + if (!autoFragment && maxFrameSize > 0 && frame.getPayloadLength() > maxFrameSize) + throw new MessageTooLargeException("Cannot handle payload lengths larger than " + maxFrameSize); + /* * RFC 6455 Section 5.5.1 * close frame payload is specially formatted which is checked in CloseStatus @@ -155,7 +161,7 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe if (!OpCode.isKnown(frame.getOpCode())) throw new ProtocolException("Unknown opcode: " + frame.getOpCode()); - int payloadLength = (frame.getPayload() == null) ? 0 : frame.getPayload().remaining(); + int payloadLength = frame.getPayloadLength(); if (frame.isControlFrame()) { if (!frame.isFin()) @@ -520,26 +526,22 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe try { - synchronized (flusher) + if (LOG.isDebugEnabled()) + LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); + + boolean closeConnection = sessionState.onOutgoingFrame(frame); + if (closeConnection) { - if (LOG.isDebugEnabled()) - LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); + Callback closeConnectionCallback = Callback.from( + () -> closeConnection(sessionState.getCloseStatus(), callback), + t -> closeConnection(sessionState.getCloseStatus(), Callback.from(callback, t))); - boolean closeConnection = sessionState.onOutgoingFrame(frame); - if (closeConnection) - { - Callback closeConnectionCallback = Callback.from( - () -> closeConnection(sessionState.getCloseStatus(), callback), - t -> closeConnection(sessionState.getCloseStatus(), Callback.from(callback, t))); - - flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false)); - } - else - { - flusher.queue.offer(new FrameEntry(frame, callback, batch)); - } + flusher.sendFrame(frame, closeConnectionCallback, false); + } + else + { + flusher.sendFrame(frame, callback, batch); } - flusher.iterate(); } catch (Throwable t) { @@ -562,11 +564,7 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe @Override public void flush(Callback callback) { - synchronized (flusher) - { - flusher.queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false)); - } - flusher.iterate(); + flusher.sendFrame(FrameFlusher.FLUSH_FRAME, callback, false); } @Override @@ -791,57 +789,17 @@ public class WebSocketCoreSession implements IncomingFrames, FrameHandler.CoreSe handler); } - private class Flusher extends IteratingCallback + private class Flusher extends FragmentingFlusher { - private final Queue queue = new ArrayDeque<>(); - FrameEntry entry; - - @Override - protected Action process() throws Throwable + public Flusher(FrameHandler.Configuration configuration) { - synchronized (this) - { - entry = queue.poll(); - } - if (entry == null) - return Action.IDLE; - - negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch); - return Action.SCHEDULED; + super(configuration); } @Override - public void succeeded() + void forwardFrame(Frame frame, Callback callback, boolean batch) { - entry.callback.succeeded(); - super.succeeded(); - } - - @Override - protected void onCompleteFailure(Throwable cause) - { - entry.callback.failed(cause); - Queue entries; - synchronized (this) - { - entries = new ArrayDeque<>(queue); - queue.clear(); - } - entries.forEach(e -> failEntry(cause, e)); - } - - private void failEntry(Throwable cause, FrameEntry e) - { - try - { - e.callback.failed(cause); - } - catch (Throwable x) - { - if (cause != x) - cause.addSuppressed(x); - LOG.warn(cause); - } + negotiated.getExtensions().sendFrame(frame, callback, batch); } } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java index cc85606a94d..7356e4fed6f 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java @@ -71,7 +71,50 @@ public class AutoFragmentTest } @Test - public void testAutoFragmentToMaxFrameSize() throws Exception + public void testOutgoingAutoFragmentToMaxFrameSize() throws Exception + { + TestFrameHandler clientHandler = new TestFrameHandler(); + CompletableFuture connect = client.connect(clientHandler, serverUri); + connect.get(5, TimeUnit.SECONDS); + + // Turn off fragmentation on the server. + assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS)); + serverHandler.coreSession.setMaxFrameSize(0); + serverHandler.coreSession.setAutoFragment(false); + + // Set the client to fragment to the maxFrameSize. + int maxFrameSize = 30; + clientHandler.coreSession.setMaxFrameSize(maxFrameSize); + clientHandler.coreSession.setAutoFragment(true); + + // Send a message which is too large. + int size = maxFrameSize * 2; + byte[] message = new byte[size]; + Arrays.fill(message, 0, size, (byte)'X'); + clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.toBuffer(message)), Callback.NOOP, false); + + // We should not receive any frames larger than the max frame size. + // So our message should be split into two frames. + Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS); + assertNotNull(frame); + assertThat(frame.getOpCode(), is(OpCode.BINARY)); + assertThat(frame.getPayloadLength(), is(maxFrameSize)); + assertThat(frame.isFin(), is(false)); + + // Second frame should be final and contain rest of the data. + frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS); + assertNotNull(frame); + assertThat(frame.getOpCode(), is(OpCode.CONTINUATION)); + assertThat(frame.getPayloadLength(), is(maxFrameSize)); + assertThat(frame.isFin(), is(true)); + + clientHandler.sendClose(); + assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); + assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testIncomingAutoFragmentToMaxFrameSize() throws Exception { TestFrameHandler clientHandler = new TestFrameHandler(); CompletableFuture connect = client.connect(clientHandler, serverUri); diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/ParsePayloadLengthTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/ParsePayloadLengthTest.java index fd5693a4bee..2b85c9c1d68 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/ParsePayloadLengthTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/ParsePayloadLengthTest.java @@ -62,6 +62,7 @@ public class ParsePayloadLengthTest public void testPayloadLength(int size, String description) throws InterruptedException { ParserCapture capture = new ParserCapture(); + capture.getCoreSession().setMaxFrameSize(0); ByteBuffer raw = BufferUtil.allocate(size + Generator.MAX_HEADER_LENGTH); BufferUtil.clearToFill(raw);