Issue #2491 - Fragmented Messages arrive out of order.
+ Adding testcase for RemoteEndpoint + Adding testcase for FrameFlusher Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
parent
0f238baaf7
commit
5add914942
|
@ -0,0 +1,74 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
|
||||
/**
|
||||
* Useful for testing the production of sane frame ordering from various components.
|
||||
*/
|
||||
public class SaneFrameOrderingAssertion implements OutgoingFrames
|
||||
{
|
||||
boolean priorDataFrame = false;
|
||||
|
||||
@Override
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
|
||||
{
|
||||
byte opcode = frame.getOpCode();
|
||||
assertThat("OpCode.isKnown(" + opcode + ")", OpCode.isKnown(opcode), is(true));
|
||||
|
||||
switch (opcode)
|
||||
{
|
||||
case OpCode.TEXT:
|
||||
assertFalse("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION", priorDataFrame);
|
||||
break;
|
||||
case OpCode.BINARY:
|
||||
assertFalse("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION", priorDataFrame);
|
||||
break;
|
||||
case OpCode.CONTINUATION:
|
||||
assertTrue("CONTINUATION frame without prior !FIN", priorDataFrame);
|
||||
break;
|
||||
case OpCode.CLOSE:
|
||||
assertFalse("Fragmented Close Frame [" + OpCode.name(opcode) + "]", frame.isFin());
|
||||
break;
|
||||
case OpCode.PING:
|
||||
assertFalse("Fragmented Close Frame [" + OpCode.name(opcode) + "]", frame.isFin());
|
||||
break;
|
||||
case OpCode.PONG:
|
||||
assertFalse("Fragmented Close Frame [" + OpCode.name(opcode) + "]", frame.isFin());
|
||||
break;
|
||||
}
|
||||
|
||||
if (OpCode.isDataFrame(frame.getOpCode()))
|
||||
{
|
||||
priorDataFrame = !frame.isFin();
|
||||
}
|
||||
|
||||
if (callback != null)
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
|
@ -18,13 +18,23 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture;
|
||||
import org.junit.Assert;
|
||||
|
@ -57,7 +67,7 @@ public class WebSocketRemoteEndpointTest
|
|||
ByteBuffer bytes = ByteBuffer.wrap(new byte[]
|
||||
{ 0, 1, 2 });
|
||||
remote.sendPartialBytes(bytes,false);
|
||||
Assert.fail("Expected " + IllegalStateException.class.getName());
|
||||
fail("Expected " + IllegalStateException.class.getName());
|
||||
}
|
||||
catch (IllegalStateException e)
|
||||
{
|
||||
|
@ -88,4 +98,37 @@ public class WebSocketRemoteEndpointTest
|
|||
// End text message
|
||||
remote.sendPartialString("World!",true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that WebSocketRemoteEndpoint honors the correct order of websocket frames.
|
||||
*
|
||||
* @see <a href="https://github.com/eclipse/jetty.project/issues/2491">eclipse/jetty.project#2491</a>
|
||||
*/
|
||||
@Test
|
||||
public void testLargeSmallText() throws ExecutionException, InterruptedException
|
||||
{
|
||||
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname, bufferPool);
|
||||
OutgoingFrames orderingAssert = new SaneFrameOrderingAssertion();
|
||||
WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn, orderingAssert);
|
||||
conn.connect();
|
||||
conn.open();
|
||||
|
||||
int largeMessageSize = 60000;
|
||||
byte buf[] = new byte[largeMessageSize];
|
||||
Arrays.fill(buf, (byte) 'x');
|
||||
String largeMessage = new String(buf, UTF_8);
|
||||
|
||||
int messageCount = 10000;
|
||||
|
||||
for (int i = 0; i < messageCount; i++)
|
||||
{
|
||||
Future<Void> fut;
|
||||
if (i % 2 == 0)
|
||||
fut = remote.sendStringByFuture(largeMessage);
|
||||
else
|
||||
fut = remote.sendStringByFuture("Short Message: " + i);
|
||||
fut.get();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ReadPendingException;
|
||||
import java.nio.channels.WritePendingException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
import org.eclipse.jetty.websocket.common.Parser;
|
||||
import org.eclipse.jetty.websocket.common.SaneFrameOrderingAssertion;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
public class FrameFlusherTest
|
||||
{
|
||||
@Rule
|
||||
public TestName testname = new TestName();
|
||||
|
||||
public ByteBufferPool bufferPool = new MappedByteBufferPool();
|
||||
|
||||
/**
|
||||
* Ensure that FrameFlusher honors the correct order of websocket frames.
|
||||
*
|
||||
* @see <a href="https://github.com/eclipse/jetty.project/issues/2491">eclipse/jetty.project#2491</a>
|
||||
*/
|
||||
@Test
|
||||
public void testLargeSmallText() throws ExecutionException, InterruptedException
|
||||
{
|
||||
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
|
||||
Generator generator = new Generator(policy, bufferPool);
|
||||
SaneFrameOrderingEndPoint endPoint = new SaneFrameOrderingEndPoint(WebSocketPolicy.newClientPolicy(), bufferPool);
|
||||
int bufferSize = policy.getMaxBinaryMessageBufferSize();
|
||||
int maxGather = 8;
|
||||
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);
|
||||
|
||||
int largeMessageSize = 60000;
|
||||
byte buf[] = new byte[largeMessageSize];
|
||||
Arrays.fill(buf, (byte) 'x');
|
||||
String largeMessage = new String(buf, UTF_8);
|
||||
|
||||
int messageCount = 10000;
|
||||
BatchMode batchMode = BatchMode.OFF;
|
||||
|
||||
CompletableFuture<Void> serverTask = new CompletableFuture<>();
|
||||
|
||||
CompletableFuture.runAsync(() -> {
|
||||
// Run Server Task
|
||||
try
|
||||
{
|
||||
for (int i = 0; i < messageCount; i++)
|
||||
{
|
||||
FutureWriteCallback callback = new FutureWriteCallback();
|
||||
WebSocketFrame frame;
|
||||
|
||||
if (i % 2 == 0)
|
||||
frame = new TextFrame().setPayload(largeMessage);
|
||||
else
|
||||
frame = new TextFrame().setPayload("Short Message: " + i);
|
||||
frameFlusher.enqueue(frame, callback, batchMode);
|
||||
callback.get();
|
||||
}
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
serverTask.completeExceptionally(t);
|
||||
}
|
||||
serverTask.complete(null);
|
||||
});
|
||||
|
||||
serverTask.get();
|
||||
System.out.printf("Received: %,d frames / %,d errors%n", endPoint.incomingFrames, endPoint.incomingErrors);
|
||||
}
|
||||
|
||||
public static class SaneFrameOrderingEndPoint extends MockEndPoint implements IncomingFrames
|
||||
{
|
||||
public Parser parser;
|
||||
public int incomingFrames;
|
||||
public int incomingErrors;
|
||||
|
||||
public SaneFrameOrderingEndPoint(WebSocketPolicy policy, ByteBufferPool bufferPool)
|
||||
{
|
||||
parser = new Parser(policy, bufferPool);
|
||||
parser.setIncomingFramesHandler(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incomingError(Throwable t)
|
||||
{
|
||||
incomingErrors++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incomingFrame(Frame frame)
|
||||
{
|
||||
incomingFrames++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdownOutput()
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
|
||||
{
|
||||
try
|
||||
{
|
||||
for (ByteBuffer buffer : buffers)
|
||||
{
|
||||
parser.parse(buffer);
|
||||
}
|
||||
if (callback != null)
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (WritePendingException e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
if (callback != null)
|
||||
callback.failed(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,172 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ReadPendingException;
|
||||
import java.nio.channels.WritePendingException;
|
||||
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
||||
public class MockEndPoint implements EndPoint
|
||||
{
|
||||
public static final String NOT_SUPPORTED = "Not supported by MockEndPoint";
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getLocalAddress()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getRemoteAddress()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreatedTimeStamp()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdownOutput()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOutputShutdown()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInputShutdown()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int fill(ByteBuffer buffer) throws IOException
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean flush(ByteBuffer... buffer) throws IOException
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getTransport()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIdleTimeout(long idleTimeout)
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fillInterested(Callback callback) throws ReadPendingException
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryFillInterested(Callback callback)
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFillInterested()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConnection(Connection connection)
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOptimizedForDirectBuffers()
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void upgrade(Connection newConnection)
|
||||
{
|
||||
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue