Issue #3374 - websocket write timeout improvements

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-04-23 10:13:20 +10:00 committed by Greg Wilkins
parent 92e38fcb42
commit 1ef191ffcd
8 changed files with 148 additions and 115 deletions

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.util;
import java.io.File;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
@ -33,6 +32,7 @@ import java.security.ProtectionDomain;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -732,4 +732,22 @@ public class TypeUtil
}
return null;
}
public static <T> Iterator<T> concat(Iterator<T> i1, Iterator<T> i2)
{
return new Iterator<>()
{
@Override
public boolean hasNext()
{
return i1.hasNext() || i2.hasNext();
}
@Override
public T next()
{
return i1.hasNext() ? i1.next() : i2.next();
}
};
}
}

View File

@ -409,7 +409,6 @@ public interface FrameHandler extends IncomingFrames
@Override
public void setWriteIdleTimeout(Duration timeout)
{
}
@Override

View File

@ -1,3 +1,21 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
public class WebSocketWriteTimeoutException extends WebSocketTimeoutException

View File

@ -53,6 +53,7 @@ import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
@ -349,7 +350,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
WebSocketChannel wsChannel = newWebSocketChannel(frameHandler, negotiated);
wsClient.customize(wsChannel);
WebSocketConnection wsConnection = newWebSocketConnection(endp, httpClient.getExecutor(), httpClient.getByteBufferPool(), wsChannel);
WebSocketConnection wsConnection = newWebSocketConnection(endp, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), wsChannel);
for (Connection.Listener listener : wsClient.getBeans(Connection.Listener.class))
wsConnection.addListener(listener);
@ -379,9 +380,9 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
{
}
protected WebSocketConnection newWebSocketConnection(EndPoint endp, Executor executor, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel)
protected WebSocketConnection newWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel)
{
return new WebSocketConnection(endp, executor, byteBufferPool, wsChannel);
return new WebSocketConnection(endp, executor, scheduler, byteBufferPool, wsChannel);
}
protected WebSocketChannel newWebSocketChannel(FrameHandler handler, Negotiated negotiated)

View File

@ -23,20 +23,20 @@ import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
@ -54,9 +54,12 @@ public class FrameFlusher extends IteratingCallback
private final Generator generator;
private final int maxGather;
private final Deque<Entry> queue = new ArrayDeque<>();
private final List<Entry> entries;
private final List<ByteBuffer> buffers;
private final Timeout timeout;
private final Scheduler timeoutScheduler;
private final List<Entry> entries;
private final List<Entry> previousEntries;
private final List<Entry> failedEntries;
private ByteBuffer batchBuffer = null;
private boolean canEnqueue = true;
private Throwable closedCause;
@ -64,7 +67,7 @@ public class FrameFlusher extends IteratingCallback
private LongAdder bytesOut = new LongAdder();
private long idleTimeout = 0;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
public FrameFlusher(ByteBufferPool bufferPool, Scheduler scheduler, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
{
this.bufferPool = bufferPool;
this.endPoint = endPoint;
@ -72,18 +75,10 @@ public class FrameFlusher extends IteratingCallback
this.generator = Objects.requireNonNull(generator);
this.maxGather = maxGather;
this.entries = new ArrayList<>(maxGather);
this.previousEntries = new ArrayList<>(maxGather);
this.failedEntries = new ArrayList<>(maxGather);
this.buffers = new ArrayList<>((maxGather * 2) + 1);
ScheduledExecutorScheduler executor = new ScheduledExecutorScheduler();
try
{
executor.start();
}
catch (Exception e)
{
e.printStackTrace();
}
this.timeout = new Timeout(executor);
this.timeoutScheduler = scheduler;
}
/**
@ -113,8 +108,8 @@ public class FrameFlusher extends IteratingCallback
else
queue.offerLast(entry);
if (idleTimeout > 0)
timeout.schedule(entry);
if ((idleTimeout > 0) && (queue.size()==1) && entries.isEmpty())
timeoutScheduler.schedule(this::timeoutExpired, idleTimeout, TimeUnit.MILLISECONDS);
if (opCode == OpCode.CLOSE)
this.canEnqueue = false;
@ -160,10 +155,9 @@ public class FrameFlusher extends IteratingCallback
if (closedCause != null)
throw closedCause;
// Succeed entries from previous call to process
// and clear batchBuffer if we wrote it.
if (succeedEntries() && batchBuffer != null)
BufferUtil.clear(batchBuffer);
// Remember entries to succeed from previous process
previousEntries.addAll(entries);
entries.clear();
while (!queue.isEmpty() && entries.size() <= maxGather)
{
@ -233,10 +227,20 @@ public class FrameFlusher extends IteratingCallback
BufferUtil.toDetailString(batchBuffer),
entries);
// succeed previous entries
for (Entry entry : previousEntries)
{
if (entry.frame.getOpCode() == OpCode.CLOSE)
endPoint.shutdownOutput();
notifyCallbackSuccess(entry.callback);
entry.release();
}
previousEntries.clear();
// If we did not get any new entries go to IDLE state
if (entries.isEmpty())
{
releaseAggregate();
succeedEntries();
return Action.IDLE;
}
@ -272,27 +276,47 @@ public class FrameFlusher extends IteratingCallback
}
}
@Override
public void succeeded()
public void timeoutExpired()
{
succeedEntries();
super.succeeded();
}
private boolean succeedEntries()
{
// todo: synchronize?
boolean hadEntries = false;
for (Entry entry : entries)
boolean failed = false;
synchronized (FrameFlusher.this)
{
hadEntries = true;
if (entry.frame.getOpCode() == OpCode.CLOSE)
endPoint.shutdownOutput();
notifyCallbackSuccess(entry.callback);
entry.release();
if (closedCause != null)
return;
long time = System.currentTimeMillis();
long nextTimeout = -1;
Iterator<Entry> iterator = TypeUtil.concat(entries.iterator(), queue.iterator());
while (iterator.hasNext())
{
Entry entry = iterator.next();
long timeSinceCreation = time - entry.getTimeOfCreation();
if (timeSinceCreation >= idleTimeout)
{
LOG.warn("FrameFlusher write timeout on entry: {}", entry);
failed = true;
canEnqueue = false;
closedCause = new WebSocketWriteTimeoutException("FrameFlusher Write Timeout");
failedEntries.addAll(entries);
failedEntries.addAll(queue);
entries.clear();
queue.clear();
break;
}
if (timeSinceCreation > nextTimeout)
nextTimeout = timeSinceCreation;
}
if (!failed && idleTimeout>0 && !entries.isEmpty())
timeoutScheduler.schedule(this::timeoutExpired, nextTimeout, TimeUnit.MILLISECONDS);
}
entries.clear();
return hadEntries;
if (failed)
this.iterate();
}
@Override
@ -302,22 +326,26 @@ public class FrameFlusher extends IteratingCallback
releaseAggregate();
synchronized (this)
{
entries.addAll(queue);
failedEntries.addAll(queue);
queue.clear();
failedEntries.addAll(entries);
entries.clear();
if (closedCause == null)
closedCause = failure;
else if (closedCause != failure)
closedCause.addSuppressed(failure);
}
for (Entry entry : entries)
for (Entry entry : failedEntries)
{
notifyCallbackFailure(entry.callback, failure);
entry.release();
}
entries.clear();
endPoint.close(failure);
failedEntries.clear();
endPoint.close(closedCause);
}
private void releaseAggregate()
@ -432,56 +460,4 @@ public class FrameFlusher extends IteratingCallback
return String.format("%s{%s,%s,%b}", getClass().getSimpleName(), frame, callback, batch);
}
}
public class Timeout extends CyclicTimeout
{
/**
* @param scheduler A scheduler used to schedule wakeups
*/
public Timeout(Scheduler scheduler)
{
super(scheduler);
}
@Override
public void onTimeoutExpired()
{
LOG.warn("idle timeout expired");
synchronized (FrameFlusher.this)
{
long time = System.currentTimeMillis();
long nextTimeout = -1;
for (Entry e : entries)
{
long timeSinceCreation = time - e.getTimeOfCreation();
if (timeSinceCreation > idleTimeout)
{
LOG.warn("FrameFlusher write timeout on entry: {}", e);
// TODO: can we fail more gracefully, this exception must be tested in processConnectionError to avoid trying to write again
// TODO: is failed() good enough, what happens if the iterating callback goes into state
failed(new WebSocketWriteTimeoutException("FrameFlusher Write Timeout"));
return;
}
if (timeSinceCreation > nextTimeout)
nextTimeout = timeSinceCreation;
}
if (!entries.isEmpty())
schedule(nextTimeout, TimeUnit.MILLISECONDS);
}
}
public void schedule(Entry entry)
{
synchronized (FrameFlusher.this)
{
if (entries.isEmpty())
schedule(idleTimeout, TimeUnit.MILLISECONDS);
}
}
}
}

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.MessageTooLargeException;
@ -79,10 +80,11 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
*/
public WebSocketConnection(EndPoint endp,
Executor executor,
Scheduler scheduler,
ByteBufferPool bufferPool,
WebSocketChannel channel)
{
this(endp, executor, bufferPool, channel, true);
this(endp, executor, scheduler, bufferPool, channel, true);
}
/**
@ -94,6 +96,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
*/
public WebSocketConnection(EndPoint endp,
Executor executor,
Scheduler scheduler,
ByteBufferPool bufferPool,
WebSocketChannel channel,
boolean validating)
@ -122,7 +125,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
};
this.flusher = new Flusher(channel.getOutputBufferSize(), generator, endp);
this.flusher = new Flusher(scheduler, channel.getOutputBufferSize(), generator, endp);
this.setInputBufferSize(channel.getInputBufferSize());
this.random = this.channel.getBehavior() == Behavior.CLIENT?new Random(endp.hashCode()):null;
@ -596,9 +599,9 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
private class Flusher extends FrameFlusher
{
private Flusher(int bufferSize, Generator generator, EndPoint endpoint)
private Flusher(Scheduler scheduler, int bufferSize, Generator generator, EndPoint endpoint)
{
super(bufferPool, generator, endpoint, bufferSize, 8);
super(bufferPool, scheduler, generator, endpoint, bufferSize, 8);
}
@Override

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
@ -201,7 +202,7 @@ public final class RFC6455Handshaker implements Handshaker
LOG.debug("channel {}", channel);
// Create a connection
WebSocketConnection connection = newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getByteBufferPool(), channel);
WebSocketConnection connection = newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), channel);
if (LOG.isDebugEnabled())
LOG.debug("connection {}", connection);
if (connection == null)
@ -241,9 +242,9 @@ public final class RFC6455Handshaker implements Handshaker
return new WebSocketChannel(handler, Behavior.SERVER, negotiated);
}
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel)
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel)
{
return new WebSocketConnection(endPoint, executor, byteBufferPool, wsChannel);
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, wsChannel);
}
private boolean getSendServerVersion(Connector connector)

View File

@ -37,11 +37,15 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -53,7 +57,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class FrameFlusherTest
{
public ByteBufferPool bufferPool = new MappedByteBufferPool();
private ByteBufferPool bufferPool = new MappedByteBufferPool();
private Scheduler scheduler;
@BeforeEach
public void start() throws Exception
{
scheduler = new ScheduledExecutorScheduler();
scheduler.start();
}
@AfterEach
public void stop() throws Exception
{
scheduler.stop();
}
/**
* Ensure post-close frames have their associated callbacks properly notified.
@ -65,7 +83,7 @@ public class FrameFlusherTest
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 1;
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather);
Frame closeFrame = new Frame(OpCode.CLOSE).setPayload(CloseStatus.asPayloadBuffer(CloseStatus.MESSAGE_TOO_LARGE, "Message be to big"));
Frame textFrame = new Frame(OpCode.TEXT).setPayload("Hello").setFin(true);
@ -96,7 +114,7 @@ public class FrameFlusherTest
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 8;
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather);
int largeMessageSize = 60000;
byte[] buf = new byte[largeMessageSize];
@ -147,10 +165,9 @@ public class FrameFlusherTest
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 8;
CountDownLatch flusherFailure = new CountDownLatch(1);
AtomicReference<Throwable> error = new AtomicReference<>();
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather)
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather)
{
@Override
public void onCompleteFailure(Throwable failure)
@ -161,7 +178,7 @@ public class FrameFlusherTest
}
};
frameFlusher.setIdleTimeout(150);
frameFlusher.setIdleTimeout(100);
endPoint.setBlockTime(200);
Frame frame = new Frame(OpCode.TEXT).setPayload("message").setFin(true);