diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/SaneFrameOrderingAssertion.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/SaneFrameOrderingAssertion.java
new file mode 100644
index 00000000000..b8aeb16dfd6
--- /dev/null
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/SaneFrameOrderingAssertion.java
@@ -0,0 +1,74 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.common;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.WriteCallback;
+import org.eclipse.jetty.websocket.api.extensions.Frame;
+import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
+
+/**
+ * Useful for testing the production of sane frame ordering from various components.
+ */
+public class SaneFrameOrderingAssertion implements OutgoingFrames
+{
+ boolean priorDataFrame = false;
+
+ @Override
+ public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
+ {
+ byte opcode = frame.getOpCode();
+ assertThat("OpCode.isKnown(" + opcode + ")", OpCode.isKnown(opcode), is(true));
+
+ switch (opcode)
+ {
+ case OpCode.TEXT:
+ assertFalse("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION", priorDataFrame);
+ break;
+ case OpCode.BINARY:
+ assertFalse("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION", priorDataFrame);
+ break;
+ case OpCode.CONTINUATION:
+ assertTrue("CONTINUATION frame without prior !FIN", priorDataFrame);
+ break;
+ case OpCode.CLOSE:
+ assertFalse("Fragmented Close Frame [" + OpCode.name(opcode) + "]", frame.isFin());
+ break;
+ case OpCode.PING:
+ assertFalse("Fragmented Close Frame [" + OpCode.name(opcode) + "]", frame.isFin());
+ break;
+ case OpCode.PONG:
+ assertFalse("Fragmented Close Frame [" + OpCode.name(opcode) + "]", frame.isFin());
+ break;
+ }
+
+ if (OpCode.isDataFrame(frame.getOpCode()))
+ {
+ priorDataFrame = !frame.isFin();
+ }
+
+ if (callback != null)
+ callback.writeSuccess();
+ }
+}
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java
index 9780e7b8c15..7e9944ac161 100644
--- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpointTest.java
@@ -18,13 +18,23 @@
package org.eclipse.jetty.websocket.common;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
+import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture;
import org.junit.Assert;
@@ -57,7 +67,7 @@ public class WebSocketRemoteEndpointTest
ByteBuffer bytes = ByteBuffer.wrap(new byte[]
{ 0, 1, 2 });
remote.sendPartialBytes(bytes,false);
- Assert.fail("Expected " + IllegalStateException.class.getName());
+ fail("Expected " + IllegalStateException.class.getName());
}
catch (IllegalStateException e)
{
@@ -88,4 +98,37 @@ public class WebSocketRemoteEndpointTest
// End text message
remote.sendPartialString("World!",true);
}
+
+ /**
+ * Ensure that WebSocketRemoteEndpoint honors the correct order of websocket frames.
+ *
+ * @see eclipse/jetty.project#2491
+ */
+ @Test
+ public void testLargeSmallText() throws ExecutionException, InterruptedException
+ {
+ LocalWebSocketConnection conn = new LocalWebSocketConnection(testname, bufferPool);
+ OutgoingFrames orderingAssert = new SaneFrameOrderingAssertion();
+ WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn, orderingAssert);
+ conn.connect();
+ conn.open();
+
+ int largeMessageSize = 60000;
+ byte buf[] = new byte[largeMessageSize];
+ Arrays.fill(buf, (byte) 'x');
+ String largeMessage = new String(buf, UTF_8);
+
+ int messageCount = 10000;
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ Future fut;
+ if (i % 2 == 0)
+ fut = remote.sendStringByFuture(largeMessage);
+ else
+ fut = remote.sendStringByFuture("Short Message: " + i);
+ fut.get();
+ }
+ }
+
}
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/FrameFlusherTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/FrameFlusherTest.java
new file mode 100644
index 00000000000..283f728a6d0
--- /dev/null
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/FrameFlusherTest.java
@@ -0,0 +1,168 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.common.io;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.MappedByteBufferPool;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FutureCallback;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.WebSocketPolicy;
+import org.eclipse.jetty.websocket.api.WriteCallback;
+import org.eclipse.jetty.websocket.api.extensions.Frame;
+import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
+import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
+import org.eclipse.jetty.websocket.common.Generator;
+import org.eclipse.jetty.websocket.common.Parser;
+import org.eclipse.jetty.websocket.common.SaneFrameOrderingAssertion;
+import org.eclipse.jetty.websocket.common.WebSocketFrame;
+import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint;
+import org.eclipse.jetty.websocket.common.frames.TextFrame;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class FrameFlusherTest
+{
+ @Rule
+ public TestName testname = new TestName();
+
+ public ByteBufferPool bufferPool = new MappedByteBufferPool();
+
+ /**
+ * Ensure that FrameFlusher honors the correct order of websocket frames.
+ *
+ * @see eclipse/jetty.project#2491
+ */
+ @Test
+ public void testLargeSmallText() throws ExecutionException, InterruptedException
+ {
+ WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
+ Generator generator = new Generator(policy, bufferPool);
+ SaneFrameOrderingEndPoint endPoint = new SaneFrameOrderingEndPoint(WebSocketPolicy.newClientPolicy(), bufferPool);
+ int bufferSize = policy.getMaxBinaryMessageBufferSize();
+ int maxGather = 8;
+ FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);
+
+ int largeMessageSize = 60000;
+ byte buf[] = new byte[largeMessageSize];
+ Arrays.fill(buf, (byte) 'x');
+ String largeMessage = new String(buf, UTF_8);
+
+ int messageCount = 10000;
+ BatchMode batchMode = BatchMode.OFF;
+
+ CompletableFuture serverTask = new CompletableFuture<>();
+
+ CompletableFuture.runAsync(() -> {
+ // Run Server Task
+ try
+ {
+ for (int i = 0; i < messageCount; i++)
+ {
+ FutureWriteCallback callback = new FutureWriteCallback();
+ WebSocketFrame frame;
+
+ if (i % 2 == 0)
+ frame = new TextFrame().setPayload(largeMessage);
+ else
+ frame = new TextFrame().setPayload("Short Message: " + i);
+ frameFlusher.enqueue(frame, callback, batchMode);
+ callback.get();
+ }
+ }
+ catch (Throwable t)
+ {
+ serverTask.completeExceptionally(t);
+ }
+ serverTask.complete(null);
+ });
+
+ serverTask.get();
+ System.out.printf("Received: %,d frames / %,d errors%n", endPoint.incomingFrames, endPoint.incomingErrors);
+ }
+
+ public static class SaneFrameOrderingEndPoint extends MockEndPoint implements IncomingFrames
+ {
+ public Parser parser;
+ public int incomingFrames;
+ public int incomingErrors;
+
+ public SaneFrameOrderingEndPoint(WebSocketPolicy policy, ByteBufferPool bufferPool)
+ {
+ parser = new Parser(policy, bufferPool);
+ parser.setIncomingFramesHandler(this);
+ }
+
+ @Override
+ public void incomingError(Throwable t)
+ {
+ incomingErrors++;
+ }
+
+ @Override
+ public void incomingFrame(Frame frame)
+ {
+ incomingFrames++;
+ }
+
+ @Override
+ public void shutdownOutput()
+ {
+ // ignore
+ }
+
+ @Override
+ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
+ {
+ try
+ {
+ for (ByteBuffer buffer : buffers)
+ {
+ parser.parse(buffer);
+ }
+ if (callback != null)
+ callback.succeeded();
+ }
+ catch (WritePendingException e)
+ {
+ throw e;
+ }
+ catch (Throwable t)
+ {
+ if (callback != null)
+ callback.failed(t);
+ }
+ }
+ }
+}
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java
new file mode 100644
index 00000000000..b31f85e8989
--- /dev/null
+++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/MockEndPoint.java
@@ -0,0 +1,172 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.common.io;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
+
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.util.Callback;
+
+public class MockEndPoint implements EndPoint
+{
+ public static final String NOT_SUPPORTED = "Not supported by MockEndPoint";
+
+ @Override
+ public InetSocketAddress getLocalAddress()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public long getCreatedTimeStamp()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void shutdownOutput()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isOutputShutdown()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isInputShutdown()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void close()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public int fill(ByteBuffer buffer) throws IOException
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean flush(ByteBuffer... buffer) throws IOException
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public Object getTransport()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public long getIdleTimeout()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void setIdleTimeout(long idleTimeout)
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void fillInterested(Callback callback) throws ReadPendingException
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean tryFillInterested(Callback callback)
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isFillInterested()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public Connection getConnection()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void setConnection(Connection connection)
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void onOpen()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void onClose()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public boolean isOptimizedForDirectBuffers()
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+
+ @Override
+ public void upgrade(Connection newConnection)
+ {
+ throw new UnsupportedOperationException(NOT_SUPPORTED);
+ }
+}