Issue #3279 - Merge of FrameFlusher fix
Signed-off-by: Joakim Erdfelt <joakim@erdfelt.com>
This commit is contained in:
parent
3826397190
commit
ee1a7f613f
|
@ -26,7 +26,6 @@ import java.util.Deque;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
|
||||||
import org.eclipse.jetty.io.ByteBufferPool;
|
import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
import org.eclipse.jetty.io.EndPoint;
|
import org.eclipse.jetty.io.EndPoint;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
@ -52,6 +51,7 @@ public class FrameFlusher extends IteratingCallback
|
||||||
private final List<Entry> entries;
|
private final List<Entry> entries;
|
||||||
private final List<ByteBuffer> buffers;
|
private final List<ByteBuffer> buffers;
|
||||||
private ByteBuffer batchBuffer = null;
|
private ByteBuffer batchBuffer = null;
|
||||||
|
private boolean canEnqueue = true;
|
||||||
private Throwable closedCause;
|
private Throwable closedCause;
|
||||||
|
|
||||||
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
|
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
|
||||||
|
@ -78,22 +78,48 @@ public class FrameFlusher extends IteratingCallback
|
||||||
{
|
{
|
||||||
Entry entry = new Entry(frame, callback, batch);
|
Entry entry = new Entry(frame, callback, batch);
|
||||||
byte opCode = frame.getOpCode();
|
byte opCode = frame.getOpCode();
|
||||||
Throwable failure = null;
|
|
||||||
|
Throwable dead;
|
||||||
|
|
||||||
synchronized (this)
|
synchronized (this)
|
||||||
{
|
{
|
||||||
if (closedCause != null)
|
if (canEnqueue)
|
||||||
failure = closedCause;
|
{
|
||||||
else if (opCode == OpCode.PING || opCode == OpCode.PONG)
|
dead = closedCause;
|
||||||
queue.offerFirst(entry);
|
if (dead == null)
|
||||||
|
{
|
||||||
|
if (opCode == OpCode.PING || opCode == OpCode.PONG)
|
||||||
|
{
|
||||||
|
queue.offerFirst(entry);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
queue.offerLast(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opCode == OpCode.CLOSE)
|
||||||
|
{
|
||||||
|
this.canEnqueue = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
queue.offerLast(entry);
|
{
|
||||||
|
dead = new ClosedChannelException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (failure != null)
|
if (dead == null)
|
||||||
callback.failed(failure);
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
{
|
||||||
|
LOG.debug("Enqueued {} to {}", entry, this);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
return failure==null;
|
notifyCallbackFailure(callback, dead);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onClose(Throwable cause)
|
public void onClose(Throwable cause)
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.websocket.core.internal;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
import java.nio.channels.WritePendingException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
|
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.eclipse.jetty.util.FutureCallback;
|
||||||
|
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||||
|
import org.eclipse.jetty.websocket.core.Frame;
|
||||||
|
import org.eclipse.jetty.websocket.core.OpCode;
|
||||||
|
import org.eclipse.jetty.websocket.core.WebSocketConstants;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class FrameFlusherTest
|
||||||
|
{
|
||||||
|
public ByteBufferPool bufferPool = new MappedByteBufferPool();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure post-close frames have their associated callbacks properly notified.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPostCloseFrameCallbacks() throws ExecutionException, InterruptedException, TimeoutException
|
||||||
|
{
|
||||||
|
Generator generator = new Generator(bufferPool);
|
||||||
|
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
|
||||||
|
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
|
||||||
|
int maxGather = 1;
|
||||||
|
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);
|
||||||
|
|
||||||
|
Frame closeFrame = new Frame(OpCode.CLOSE).setPayload(CloseStatus.asPayloadBuffer(CloseStatus.MESSAGE_TOO_LARGE, "Message be to big"));
|
||||||
|
Frame textFrame = new Frame(OpCode.TEXT).setPayload("Hello").setFin(true);
|
||||||
|
|
||||||
|
FutureCallback closeCallback = new FutureCallback();
|
||||||
|
FutureCallback textFrameCallback = new FutureCallback();
|
||||||
|
|
||||||
|
assertTrue(frameFlusher.enqueue(closeFrame, closeCallback, false));
|
||||||
|
assertFalse(frameFlusher.enqueue(textFrame, textFrameCallback, false));
|
||||||
|
frameFlusher.iterate();
|
||||||
|
|
||||||
|
closeCallback.get(5, TimeUnit.SECONDS);
|
||||||
|
// If this throws a TimeoutException then the callback wasn't called.
|
||||||
|
ExecutionException x = assertThrows(ExecutionException.class,
|
||||||
|
()-> textFrameCallback.get(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(x.getCause(), instanceOf(ClosedChannelException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that FrameFlusher honors the correct order of websocket frames.
|
||||||
|
*
|
||||||
|
* @see <a href="https://github.com/eclipse/jetty.project/issues/2491">eclipse/jetty.project#2491</a>
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLargeSmallText() throws ExecutionException, InterruptedException
|
||||||
|
{
|
||||||
|
Generator generator = new Generator(bufferPool);
|
||||||
|
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
|
||||||
|
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
|
||||||
|
int maxGather = 8;
|
||||||
|
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);
|
||||||
|
|
||||||
|
int largeMessageSize = 60000;
|
||||||
|
byte[] buf = new byte[largeMessageSize];
|
||||||
|
Arrays.fill(buf, (byte) 'x');
|
||||||
|
String largeMessage = new String(buf, UTF_8);
|
||||||
|
|
||||||
|
int messageCount = 10000;
|
||||||
|
|
||||||
|
CompletableFuture<Void> serverTask = new CompletableFuture<>();
|
||||||
|
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
// Run Server Task
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (int i = 0; i < messageCount; i++)
|
||||||
|
{
|
||||||
|
FutureCallback callback = new FutureCallback();
|
||||||
|
Frame frame;
|
||||||
|
|
||||||
|
if (i % 2 == 0)
|
||||||
|
{
|
||||||
|
frame = new Frame(OpCode.TEXT).setPayload(largeMessage).setFin(true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
frame = new Frame(OpCode.TEXT).setPayload("Short Message: " + i).setFin(true);
|
||||||
|
}
|
||||||
|
frameFlusher.enqueue(frame, callback, false);
|
||||||
|
frameFlusher.iterate();
|
||||||
|
callback.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
serverTask.completeExceptionally(t);
|
||||||
|
}
|
||||||
|
serverTask.complete(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
serverTask.get();
|
||||||
|
System.out.printf("Received: %,d frames%n", endPoint.incomingFrames.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CapturingEndPoint extends MockEndpoint
|
||||||
|
{
|
||||||
|
public Parser parser;
|
||||||
|
public LinkedBlockingQueue<Frame> incomingFrames = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
|
public CapturingEndPoint(ByteBufferPool bufferPool)
|
||||||
|
{
|
||||||
|
parser = new Parser(bufferPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdownOutput()
|
||||||
|
{
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
|
||||||
|
{
|
||||||
|
Objects.requireNonNull(callback);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (ByteBuffer buffer : buffers)
|
||||||
|
{
|
||||||
|
Parser.ParsedFrame frame = parser.parse(buffer);
|
||||||
|
if(frame != null)
|
||||||
|
{
|
||||||
|
incomingFrames.offer(frame);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
catch (WritePendingException e)
|
||||||
|
{
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
catch (Throwable t)
|
||||||
|
{
|
||||||
|
callback.failed(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,178 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.websocket.core.internal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.ReadPendingException;
|
||||||
|
import java.nio.channels.WritePendingException;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.io.Connection;
|
||||||
|
import org.eclipse.jetty.io.EndPoint;
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
|
||||||
|
public class MockEndpoint implements EndPoint
|
||||||
|
{
|
||||||
|
public static final String NOT_SUPPORTED = "Not supported by MockEndPoint";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InetSocketAddress getLocalAddress()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InetSocketAddress getRemoteAddress()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isOpen()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getCreatedTimeStamp()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdownOutput()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isOutputShutdown()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isInputShutdown()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(Throwable cause)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int fill(ByteBuffer buffer) throws IOException
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean flush(ByteBuffer... buffer) throws IOException
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getTransport()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTimeout()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setIdleTimeout(long idleTimeout)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fillInterested(Callback callback) throws ReadPendingException
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryFillInterested(Callback callback)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFillInterested()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Connection getConnection()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConnection(Connection connection)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onOpen()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Throwable cause)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isOptimizedForDirectBuffers()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void upgrade(Connection newConnection)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException(NOT_SUPPORTED);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue