From 9b8540aef41668e2ad441dbcb3c65490e531c0fd Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 26 Apr 2018 12:50:32 -0500 Subject: [PATCH 1/5] OpCode.isDataFrame(byte) missing CONTINUATION Signed-off-by: Joakim Erdfelt --- .../main/java/org/eclipse/jetty/websocket/common/OpCode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/OpCode.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/OpCode.java index 43a221c0c99..41afcd57c5e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/OpCode.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/OpCode.java @@ -74,7 +74,7 @@ public final class OpCode public static boolean isDataFrame(byte opcode) { - return (opcode == TEXT) || (opcode == BINARY); + return (opcode == TEXT) || (opcode == BINARY) || (opcode == CONTINUATION); } /** From 5d83d98e9673a0975d9f36aaa245430015341cfb Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 26 Apr 2018 12:51:18 -0500 Subject: [PATCH 2/5] AbstractExtension properly handles Callback error now Signed-off-by: Joakim Erdfelt --- .../common/extensions/AbstractExtension.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java index d5bfc408ba5..bf7d1a10486 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/AbstractExtension.java @@ -185,8 +185,18 @@ public abstract class AbstractExtension extends AbstractLifeCycle implements Dum protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { - log.debug("nextOutgoingFrame({})",frame); - this.nextOutgoing.outgoingFrame(frame,callback, batchMode); + try + { + log.debug("nextOutgoingFrame({})", frame); + this.nextOutgoing.outgoingFrame(frame, callback, batchMode); + } + catch (Throwable t) + { + if (callback != null) + callback.writeFailed(t); + else + log.warn(t); + } } public void setBufferPool(ByteBufferPool bufferPool) From 440d89750a61d1fb2a3202ce44f2d933b52903c6 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 26 Apr 2018 12:51:49 -0500 Subject: [PATCH 3/5] Track frameCount for tests now Signed-off-by: Joakim Erdfelt --- .../jetty/websocket/common/SaneFrameOrderingAssertion.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/SaneFrameOrderingAssertion.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/SaneFrameOrderingAssertion.java index b8aeb16dfd6..2140d234c99 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/SaneFrameOrderingAssertion.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/SaneFrameOrderingAssertion.java @@ -34,6 +34,7 @@ import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; public class SaneFrameOrderingAssertion implements OutgoingFrames { boolean priorDataFrame = false; + public int frameCount = 0; @Override public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) @@ -63,11 +64,13 @@ public class SaneFrameOrderingAssertion implements OutgoingFrames break; } - if (OpCode.isDataFrame(frame.getOpCode())) + if (OpCode.isDataFrame(opcode)) { priorDataFrame = !frame.isFin(); } + frameCount++; + if (callback != null) callback.writeSuccess(); } From c596fcaac18f2a1c26fa45c31e787b2da38c3548 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 26 Apr 2018 12:54:15 -0500 Subject: [PATCH 4/5] Issue #2491 - FragmentExtension produces out of order frames + Adding testcase to ensure no regression + All data frames that arrive, are now sent through the IteratingCallback to ensure that that input frame order is preserved. Signed-off-by: Joakim Erdfelt --- .../fragment/FragmentExtension.java | 47 +++++++--- .../extensions/FragmentExtensionTest.java | 87 ++++++++++++++++++- 2 files changed, 121 insertions(+), 13 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java index 51239299b44..7f5cb447000 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java @@ -60,19 +60,28 @@ public class FragmentExtension extends AbstractExtension @Override public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { - ByteBuffer payload = frame.getPayload(); - int length = payload != null ? payload.remaining() : 0; - if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength) + if (OpCode.isControlFrame(frame.getOpCode())) { + // Skip fragment of control frames nextOutgoingFrame(frame, callback, batchMode); return; } - FrameEntry entry = new FrameEntry(frame, callback, batchMode); - if (LOG.isDebugEnabled()) - LOG.debug("Queuing {}", entry); - offerEntry(entry); - flusher.iterate(); + // Handle everything else (data frames) + if(maxLength >= 1) + { + FrameEntry entry = new FrameEntry(frame, callback, batchMode); + if (LOG.isDebugEnabled()) + LOG.debug("Queuing {}", entry); + offerEntry(entry); + flusher.iterate(); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Pass Through {}", frame); + nextOutgoingFrame(frame, callback, batchMode); + } } @Override @@ -124,14 +133,17 @@ public class FragmentExtension extends AbstractExtension private boolean finished = true; @Override - protected Action process() throws Exception + protected Action process() { if (finished) { current = pollEntry(); - LOG.debug("Processing {}", current); if (current == null) + { + LOG.debug("Processing IDLE", current); return Action.IDLE; + } + LOG.debug("Processing {}", current); fragment(current, true); } else @@ -146,9 +158,19 @@ public class FragmentExtension extends AbstractExtension Frame frame = entry.frame; ByteBuffer payload = frame.getPayload(); int remaining = payload.remaining(); + int length = Math.min(remaining, maxLength); finished = length == remaining; + if (first && finished) + { + // Simple send, no need to fragment. + if (LOG.isDebugEnabled()) + LOG.debug("Skip Fragmentation {}", frame); + nextOutgoingFrame(frame, this, entry.batchMode); + return; + } + boolean continuation = frame.getType().isContinuation() || !first; DataFrame fragment = new DataFrame(frame, continuation); boolean fin = frame.isFin() && finished; @@ -186,7 +208,10 @@ public class FragmentExtension extends AbstractExtension { // Notify first then call succeeded(), otherwise // write callbacks may be invoked out of order. - notifyCallbackSuccess(current.callback); + + // only notify current (original) frame on completion + if (finished) + notifyCallbackSuccess(current.callback); succeeded(); } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java index d1401160b01..5fd4771bf0c 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/FragmentExtensionTest.java @@ -18,36 +18,48 @@ package org.eclipse.jetty.websocket.common.extensions; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.SaneFrameOrderingAssertion; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.PingFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame; +import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; import org.eclipse.jetty.websocket.common.test.ByteBufferAssert; import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture; import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture; import org.junit.Assert; import org.junit.Test; +@SuppressWarnings("Duplicates") public class FragmentExtensionTest { + private static final Logger LOG = Log.getLogger(FragmentExtensionTest.class); + public ByteBufferPool bufferPool = new MappedByteBufferPool(); /** @@ -139,6 +151,7 @@ public class FragmentExtensionTest /** * Verify that outgoing text frames are fragmented by the maxLength configuration. + * * @throws IOException on test failure */ @Test @@ -211,11 +224,12 @@ public class FragmentExtensionTest } /** - * Verify that outgoing text frames are fragmented by default configuration + * Verify that outgoing text frames are not fragmented by default configuration (which has no maxLength specified) + * * @throws IOException on test failure */ @Test - public void testOutgoingFramesDefaultConfig() throws IOException + public void testOutgoingFramesDefaultConfig() throws Exception { OutgoingFramesCapture capture = new OutgoingFramesCapture(); @@ -277,6 +291,7 @@ public class FragmentExtensionTest /** * Outgoing PING (Control Frame) should pass through extension unmodified + * * @throws IOException on test failure */ @Test @@ -312,4 +327,72 @@ public class FragmentExtensionTest Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); } + + /** + * Ensure that FragmentExtension honors the correct order of websocket frames. + * + * @see eclipse/jetty.project#2491 + */ + @Test + public void testLargeSmallTextAlternating() throws Exception + { + final int largeMessageSize = 60000; + byte buf[] = new byte[largeMessageSize]; + Arrays.fill(buf, (byte) 'x'); + String largeMessage = new String(buf, UTF_8); + + final int fragmentCount = 10; + final int fragmentLength = largeMessageSize / fragmentCount; + final int messageCount = 10000; + + FragmentExtension ext = new FragmentExtension(); + ext.setBufferPool(bufferPool); + ext.setPolicy(WebSocketPolicy.newServerPolicy()); + ExtensionConfig config = ExtensionConfig.parse("fragment;maxLength=" + fragmentLength); + ext.setConfig(config); + SaneFrameOrderingAssertion saneFrameOrderingAssertion = new SaneFrameOrderingAssertion(); + ext.setNextOutgoingFrames(saneFrameOrderingAssertion); + + CompletableFuture enqueuedFrameCountFut = new CompletableFuture<>(); + + CompletableFuture.runAsync(() -> { + // Run Server Task + int frameCount = 0; + BatchMode batchMode = BatchMode.OFF; + try + { + for (int i = 0; i < messageCount; i++) + { + int messageId = i; + FutureWriteCallback callback = new FutureWriteCallback(); + WebSocketFrame frame; + if (i % 2 == 0) + { + frame = new TextFrame().setPayload(largeMessage); + frameCount += fragmentCount; + } + else + { + frame = new TextFrame().setPayload("Short Message: " + i); + frameCount++; + } + ext.outgoingFrame(frame, callback, batchMode); + callback.get(); + } + enqueuedFrameCountFut.complete(frameCount); + } + catch (Throwable t) + { + enqueuedFrameCountFut.completeExceptionally(t); + } + }); + + int enqueuedFrameCount = enqueuedFrameCountFut.get(5, SECONDS); + + int expectedFrameCount = (messageCount/2) * fragmentCount; // large messages + expectedFrameCount += (messageCount/2); // + short messages + + assertThat("Saw expected frame count", saneFrameOrderingAssertion.frameCount, is(expectedFrameCount)); + assertThat("Enqueued expected frame count", enqueuedFrameCount, is(expectedFrameCount)); + } } From 76dec16fde62c916c9c3928eed3f1d1927476f50 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Thu, 26 Apr 2018 16:12:29 -0500 Subject: [PATCH 5/5] Issue #2491 - minor revert based on review --- .../fragment/FragmentExtension.java | 34 +++++-------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java index 7f5cb447000..38b8eb832a0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/fragment/FragmentExtension.java @@ -60,28 +60,19 @@ public class FragmentExtension extends AbstractExtension @Override public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { - if (OpCode.isControlFrame(frame.getOpCode())) + ByteBuffer payload = frame.getPayload(); + int length = payload != null ? payload.remaining() : 0; + if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength) { - // Skip fragment of control frames nextOutgoingFrame(frame, callback, batchMode); return; } - // Handle everything else (data frames) - if(maxLength >= 1) - { - FrameEntry entry = new FrameEntry(frame, callback, batchMode); - if (LOG.isDebugEnabled()) - LOG.debug("Queuing {}", entry); - offerEntry(entry); - flusher.iterate(); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Pass Through {}", frame); - nextOutgoingFrame(frame, callback, batchMode); - } + FrameEntry entry = new FrameEntry(frame, callback, batchMode); + if (LOG.isDebugEnabled()) + LOG.debug("Queuing {}", entry); + offerEntry(entry); + flusher.iterate(); } @Override @@ -162,15 +153,6 @@ public class FragmentExtension extends AbstractExtension int length = Math.min(remaining, maxLength); finished = length == remaining; - if (first && finished) - { - // Simple send, no need to fragment. - if (LOG.isDebugEnabled()) - LOG.debug("Skip Fragmentation {}", frame); - nextOutgoingFrame(frame, this, entry.batchMode); - return; - } - boolean continuation = frame.getType().isContinuation() || !first; DataFrame fragment = new DataFrame(frame, continuation); boolean fin = frame.isFin() && finished;