Merge pull request #2493 from eclipse/jetty-9.4.x-issue-2491-ws-fragmentextension

Issue #2491 WebSocket FragmentExtension produces out of order Frames
This commit is contained in:
Joakim Erdfelt 2018-04-26 16:48:46 -05:00 committed by GitHub
commit 38cd903922
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 112 additions and 9 deletions

View File

@ -74,7 +74,7 @@ public final class OpCode
public static boolean isDataFrame(byte opcode) public static boolean isDataFrame(byte opcode)
{ {
return (opcode == TEXT) || (opcode == BINARY); return (opcode == TEXT) || (opcode == BINARY) || (opcode == CONTINUATION);
} }
/** /**

View File

@ -185,8 +185,18 @@ public abstract class AbstractExtension extends AbstractLifeCycle implements Dum
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{ {
log.debug("nextOutgoingFrame({})",frame); try
this.nextOutgoing.outgoingFrame(frame,callback, batchMode); {
log.debug("nextOutgoingFrame({})", frame);
this.nextOutgoing.outgoingFrame(frame, callback, batchMode);
}
catch (Throwable t)
{
if (callback != null)
callback.writeFailed(t);
else
log.warn(t);
}
} }
public void setBufferPool(ByteBufferPool bufferPool) public void setBufferPool(ByteBufferPool bufferPool)

View File

@ -124,14 +124,17 @@ public class FragmentExtension extends AbstractExtension
private boolean finished = true; private boolean finished = true;
@Override @Override
protected Action process() throws Exception protected Action process()
{ {
if (finished) if (finished)
{ {
current = pollEntry(); current = pollEntry();
LOG.debug("Processing {}", current);
if (current == null) if (current == null)
{
LOG.debug("Processing IDLE", current);
return Action.IDLE; return Action.IDLE;
}
LOG.debug("Processing {}", current);
fragment(current, true); fragment(current, true);
} }
else else
@ -146,6 +149,7 @@ public class FragmentExtension extends AbstractExtension
Frame frame = entry.frame; Frame frame = entry.frame;
ByteBuffer payload = frame.getPayload(); ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining(); int remaining = payload.remaining();
int length = Math.min(remaining, maxLength); int length = Math.min(remaining, maxLength);
finished = length == remaining; finished = length == remaining;
@ -186,7 +190,10 @@ public class FragmentExtension extends AbstractExtension
{ {
// Notify first then call succeeded(), otherwise // Notify first then call succeeded(), otherwise
// write callbacks may be invoked out of order. // write callbacks may be invoked out of order.
notifyCallbackSuccess(current.callback);
// only notify current (original) frame on completion
if (finished)
notifyCallbackSuccess(current.callback);
succeeded(); succeeded();
} }

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
public class SaneFrameOrderingAssertion implements OutgoingFrames public class SaneFrameOrderingAssertion implements OutgoingFrames
{ {
boolean priorDataFrame = false; boolean priorDataFrame = false;
public int frameCount = 0;
@Override @Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
@ -63,11 +64,13 @@ public class SaneFrameOrderingAssertion implements OutgoingFrames
break; break;
} }
if (OpCode.isDataFrame(frame.getOpCode())) if (OpCode.isDataFrame(opcode))
{ {
priorDataFrame = !frame.isFin(); priorDataFrame = !frame.isFin();
} }
frameCount++;
if (callback != null) if (callback != null)
callback.writeSuccess(); callback.writeSuccess();
} }

View File

@ -18,36 +18,48 @@
package org.eclipse.jetty.websocket.common.extensions; package org.eclipse.jetty.websocket.common.extensions;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.SaneFrameOrderingAssertion;
import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension; import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.PingFrame; import org.eclipse.jetty.websocket.common.frames.PingFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.test.ByteBufferAssert; import org.eclipse.jetty.websocket.common.test.ByteBufferAssert;
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture; import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture; import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("Duplicates")
public class FragmentExtensionTest public class FragmentExtensionTest
{ {
private static final Logger LOG = Log.getLogger(FragmentExtensionTest.class);
public ByteBufferPool bufferPool = new MappedByteBufferPool(); public ByteBufferPool bufferPool = new MappedByteBufferPool();
/** /**
@ -139,6 +151,7 @@ public class FragmentExtensionTest
/** /**
* Verify that outgoing text frames are fragmented by the maxLength configuration. * Verify that outgoing text frames are fragmented by the maxLength configuration.
*
* @throws IOException on test failure * @throws IOException on test failure
*/ */
@Test @Test
@ -211,11 +224,12 @@ public class FragmentExtensionTest
} }
/** /**
* Verify that outgoing text frames are fragmented by default configuration * Verify that outgoing text frames are not fragmented by default configuration (which has no maxLength specified)
*
* @throws IOException on test failure * @throws IOException on test failure
*/ */
@Test @Test
public void testOutgoingFramesDefaultConfig() throws IOException public void testOutgoingFramesDefaultConfig() throws Exception
{ {
OutgoingFramesCapture capture = new OutgoingFramesCapture(); OutgoingFramesCapture capture = new OutgoingFramesCapture();
@ -277,6 +291,7 @@ public class FragmentExtensionTest
/** /**
* Outgoing PING (Control Frame) should pass through extension unmodified * Outgoing PING (Control Frame) should pass through extension unmodified
*
* @throws IOException on test failure * @throws IOException on test failure
*/ */
@Test @Test
@ -312,4 +327,72 @@ public class FragmentExtensionTest
Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining())); Assert.assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice()); ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
} }
/**
* Ensure that FragmentExtension honors the correct order of websocket frames.
*
* @see <a href="https://github.com/eclipse/jetty.project/issues/2491">eclipse/jetty.project#2491</a>
*/
@Test
public void testLargeSmallTextAlternating() throws Exception
{
final int largeMessageSize = 60000;
byte buf[] = new byte[largeMessageSize];
Arrays.fill(buf, (byte) 'x');
String largeMessage = new String(buf, UTF_8);
final int fragmentCount = 10;
final int fragmentLength = largeMessageSize / fragmentCount;
final int messageCount = 10000;
FragmentExtension ext = new FragmentExtension();
ext.setBufferPool(bufferPool);
ext.setPolicy(WebSocketPolicy.newServerPolicy());
ExtensionConfig config = ExtensionConfig.parse("fragment;maxLength=" + fragmentLength);
ext.setConfig(config);
SaneFrameOrderingAssertion saneFrameOrderingAssertion = new SaneFrameOrderingAssertion();
ext.setNextOutgoingFrames(saneFrameOrderingAssertion);
CompletableFuture<Integer> enqueuedFrameCountFut = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
// Run Server Task
int frameCount = 0;
BatchMode batchMode = BatchMode.OFF;
try
{
for (int i = 0; i < messageCount; i++)
{
int messageId = i;
FutureWriteCallback callback = new FutureWriteCallback();
WebSocketFrame frame;
if (i % 2 == 0)
{
frame = new TextFrame().setPayload(largeMessage);
frameCount += fragmentCount;
}
else
{
frame = new TextFrame().setPayload("Short Message: " + i);
frameCount++;
}
ext.outgoingFrame(frame, callback, batchMode);
callback.get();
}
enqueuedFrameCountFut.complete(frameCount);
}
catch (Throwable t)
{
enqueuedFrameCountFut.completeExceptionally(t);
}
});
int enqueuedFrameCount = enqueuedFrameCountFut.get(5, SECONDS);
int expectedFrameCount = (messageCount/2) * fragmentCount; // large messages
expectedFrameCount += (messageCount/2); // + short messages
assertThat("Saw expected frame count", saneFrameOrderingAssertion.frameCount, is(expectedFrameCount));
assertThat("Enqueued expected frame count", enqueuedFrameCount, is(expectedFrameCount));
}
} }