Issue #845 - Improve blocking IO for data rate limiting.

Moved tests to run HTTP and HTTP/2 tests, and added more test cases.
This commit is contained in:
Simone Bordet 2016-08-31 12:15:31 +02:00
parent 0322a1640d
commit 705a68dfc4
15 changed files with 1027 additions and 268 deletions

View File

@ -50,6 +50,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0; private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private ExecutionStrategy.Factory executionStrategyFactory = new ProduceExecuteConsume.Factory(); private ExecutionStrategy.Factory executionStrategyFactory = new ProduceExecuteConsume.Factory();
private long streamIdleTimeout;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{ {
@ -157,6 +158,17 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.executionStrategyFactory = executionStrategyFactory; this.executionStrategyFactory = executionStrategyFactory;
} }
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}
public HttpConfiguration getHttpConfiguration() public HttpConfiguration getHttpConfiguration()
{ {
return httpConfiguration; return httpConfiguration;
@ -177,8 +189,11 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
// For a single stream in a connection, there will be a race between // For a single stream in a connection, there will be a race between
// the stream idle timeout and the connection idle timeout. However, // the stream idle timeout and the connection idle timeout. However,
// the typical case is that the connection will be busier and the // the typical case is that the connection will be busier and the
// stream idle timeout will expire earlier that the connection's. // stream idle timeout will expire earlier than the connection's.
session.setStreamIdleTimeout(endPoint.getIdleTimeout()); long streamIdleTimeout = getStreamIdleTimeout();
if (streamIdleTimeout <= 0)
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow()); session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());
ServerParser parser = newServerParser(connector, session); ServerParser parser = newServerParser(connector, session);

View File

@ -133,9 +133,9 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
public boolean onStreamTimeout(IStream stream, Throwable failure) public boolean onStreamTimeout(IStream stream, Throwable failure)
{ {
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = !channel.isRequestHandled(); boolean result = channel.onStreamTimeout(failure);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", stream, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
return result; return result;
} }
@ -157,7 +157,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
result &= !channel.isRequestHandled(); result &= !channel.isRequestHandled();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", session, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
return result; return result;
} }

View File

@ -267,11 +267,11 @@ public class HttpChannelOverHTTP2 extends HttpChannel
handle); handle);
} }
boolean delayed = _delayedUntilContent; boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false; _delayedUntilContent = false;
if (delayed) if (wasDelayed)
_handled = true; _handled = true;
return handle || delayed ? this : null; return handle || wasDelayed ? this : null;
} }
public boolean isRequestHandled() public boolean isRequestHandled()
@ -279,6 +279,21 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return _handled; return _handled;
} }
public boolean onStreamTimeout(Throwable failure)
{
if (!_handled)
return true;
HttpInput input = getRequest().getHttpInput();
boolean readFailed = input.failed(failure);
if (readFailed)
handle();
boolean writeFailed = getHttpTransport().onStreamTimeout(failure);
return readFailed || writeFailed;
}
public void onFailure(Throwable failure) public void onFailure(Throwable failure)
{ {
onEarlyEOF(); onEarlyEOF();

View File

@ -43,7 +43,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class); private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class);
private final AtomicBoolean commit = new AtomicBoolean(); private final AtomicBoolean commit = new AtomicBoolean();
private final Callback commitCallback = new CommitCallback(); private final TransportCallback transportCallback = new TransportCallback();
private final Connector connector; private final Connector connector;
private final HTTP2ServerConnection connection; private final HTTP2ServerConnection connection;
private IStream stream; private IStream stream;
@ -100,12 +100,22 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{ {
if (hasContent) if (hasContent)
{ {
commit(info, false, commitCallback); Callback commitCallback = new Callback.Nested(callback)
send(content, lastContent, callback); {
@Override
public void succeeded()
{
if (transportCallback.start(callback, false))
send(content, lastContent, transportCallback);
}
};
if (transportCallback.start(commitCallback, true))
commit(info, false, transportCallback);
} }
else else
{ {
commit(info, lastContent, callback); if (transportCallback.start(callback, false))
commit(info, lastContent, transportCallback);
} }
} }
else else
@ -117,7 +127,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{ {
if (hasContent || lastContent) if (hasContent || lastContent)
{ {
send(content, lastContent, callback); if (transportCallback.start(callback, false))
send(content, lastContent, transportCallback);
} }
else else
{ {
@ -186,6 +197,11 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.data(frame, callback); stream.data(frame, callback);
} }
public boolean onStreamTimeout(Throwable failure)
{
return transportCallback.onIdleTimeout(failure);
}
@Override @Override
public void onCompleted() public void onCompleted()
{ {
@ -214,20 +230,99 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP); stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
} }
private class CommitCallback implements Callback.NonBlocking private class TransportCallback implements Callback
{ {
private State state = State.IDLE;
private Callback callback;
private boolean commit;
public boolean start(Callback callback, boolean commit)
{
State state;
synchronized (this)
{
state = this.state;
if (state == State.IDLE)
{
this.state = State.WRITING;
this.callback = callback;
this.commit = commit;
return true;
}
}
callback.failed(new IllegalStateException("Invalid transport state: " + state));
return false;
}
@Override @Override
public void succeeded() public void succeeded()
{ {
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
{
callback = this.callback;
this.state = State.IDLE;
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} committed", stream.getId()); LOG.debug("HTTP2 Response #{} {}", stream.getId(), commit ? "committed" : "flushed content");
if (callback != null)
callback.succeeded();
} }
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
{
callback = this.callback;
this.state = State.FAILED;
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x); LOG.debug("HTTP2 Response #" + stream.getId() + " failed to " + (commit ? "commit" : "flush"), x);
if (callback != null)
callback.failed(x);
}
@Override
public boolean isNonBlocking()
{
return callback.isNonBlocking();
}
private boolean onIdleTimeout(Throwable failure)
{
boolean result;
Callback callback = null;
synchronized (this)
{
result = state == State.WRITING;
if (result)
{
callback = this.callback;
this.state = State.TIMEOUT;
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " idle timeout", failure);
if (result)
callback.failed(failure);
return result;
} }
} }
private enum State
{
IDLE, WRITING, FAILED, TIMEOUT
}
} }

View File

@ -419,8 +419,8 @@ public class HttpChannelState
_state=State.ASYNC_WAIT; _state=State.ASYNC_WAIT;
action=Action.WAIT; action=Action.WAIT;
if (_asyncReadUnready) if (_asyncReadUnready)
_channel.asyncReadFillInterested(); read_interested=true;
Scheduler scheduler = _channel.getScheduler(); Scheduler scheduler=_channel.getScheduler();
if (scheduler!=null && _timeoutMs>0) if (scheduler!=null && _timeoutMs>0)
_event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS)); _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));
} }
@ -454,6 +454,9 @@ public class HttpChannelState
} }
} }
if (read_interested)
_channel.asyncReadFillInterested();
return action; return action;
} }

View File

@ -65,7 +65,7 @@ public class HttpConfiguration
private boolean _delayDispatchUntilContent = true; private boolean _delayDispatchUntilContent = true;
private boolean _persistentConnectionsEnabled = true; private boolean _persistentConnectionsEnabled = true;
private int _maxErrorDispatches = 10; private int _maxErrorDispatches = 10;
private int _minRequestDataRate; private long _minRequestDataRate;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -512,7 +512,7 @@ public class HttpConfiguration
/** /**
* @return The minimum request data rate in bytes per second; or &lt;=0 for no limit * @return The minimum request data rate in bytes per second; or &lt;=0 for no limit
*/ */
public int getMinRequestDataRate() public long getMinRequestDataRate()
{ {
return _minRequestDataRate; return _minRequestDataRate;
} }
@ -521,7 +521,7 @@ public class HttpConfiguration
/** /**
* @param bytesPerSecond The minimum request data rate in bytes per second; or &lt;=0 for no limit * @param bytesPerSecond The minimum request data rate in bytes per second; or &lt;=0 for no limit
*/ */
public void setMinRequestDataRate(int bytesPerSecond) public void setMinRequestDataRate(long bytesPerSecond)
{ {
_minRequestDataRate=bytesPerSecond; _minRequestDataRate=bytesPerSecond;
} }

View File

@ -124,9 +124,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
protected HttpChannelOverHttp newHttpChannel() protected HttpChannelOverHttp newHttpChannel()
{ {
HttpChannelOverHttp httpChannel = new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this); return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
return httpChannel;
} }
protected HttpParser newHttpParser(HttpCompliance compliance) protected HttpParser newHttpParser(HttpCompliance compliance)
@ -285,9 +283,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
while (_parser.inContentState()) while (_parser.inContentState())
{ {
int filled = fillRequestBuffer(); int filled = fillRequestBuffer();
boolean handle = parseRequestBuffer(); handled = parseRequestBuffer();
handled|=handle; if (handled || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
break; break;
} }
return handled; return handled;

View File

@ -62,13 +62,11 @@ public class HttpInput extends ServletInputStream implements Runnable
private long _firstByteTimeStamp = -1; private long _firstByteTimeStamp = -1;
private long _contentArrived; private long _contentArrived;
private long _contentConsumed; private long _contentConsumed;
private long _blockingTimeoutAt = -1; private long _blockUntil;
public HttpInput(HttpChannelState state) public HttpInput(HttpChannelState state)
{ {
_channelState = state; _channelState = state;
if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout() > 0)
_blockingTimeoutAt = 0;
} }
protected HttpChannelState getHttpChannelState() protected HttpChannelState getHttpChannelState()
@ -91,7 +89,7 @@ public class HttpInput extends ServletInputStream implements Runnable
_contentArrived = 0; _contentArrived = 0;
_contentConsumed = 0; _contentConsumed = 0;
_firstByteTimeStamp = -1; _firstByteTimeStamp = -1;
_blockingTimeoutAt = -1; _blockUntil = 0;
} }
} }
@ -132,6 +130,10 @@ public class HttpInput extends ServletInputStream implements Runnable
executor.execute(channel); executor.execute(channel);
} }
private long getBlockingTimeout()
{
return getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
}
@Override @Override
public int read() throws IOException public int read() throws IOException
@ -147,10 +149,17 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_blockingTimeoutAt >= 0 && !isAsync()) if (!isAsync())
_blockingTimeoutAt = System.currentTimeMillis() + getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); {
if (_blockUntil == 0)
{
long blockingTimeout = getBlockingTimeout();
if (blockingTimeout > 0)
_blockUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(blockingTimeout);
}
}
int minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate();
if (minRequestDataRate > 0 && _firstByteTimeStamp != -1) if (minRequestDataRate > 0 && _firstByteTimeStamp != -1)
{ {
long period = System.nanoTime() - _firstByteTimeStamp; long period = System.nanoTime() - _firstByteTimeStamp;
@ -374,9 +383,9 @@ public class HttpInput extends ServletInputStream implements Runnable
try try
{ {
long timeout = 0; long timeout = 0;
if (_blockingTimeoutAt >= 0) if (_blockUntil != 0)
{ {
timeout = _blockingTimeoutAt - System.currentTimeMillis(); timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime());
if (timeout <= 0) if (timeout <= 0)
throw new TimeoutException(); throw new TimeoutException();
} }
@ -388,8 +397,11 @@ public class HttpInput extends ServletInputStream implements Runnable
else else
_inputQ.wait(); _inputQ.wait();
if (_blockingTimeoutAt > 0 && System.currentTimeMillis() >= _blockingTimeoutAt) // TODO: cannot return unless there is content or timeout,
throw new TimeoutException(); // TODO: so spurious wakeups are not handled correctly.
if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0)
throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
} }
catch (Throwable e) catch (Throwable e)
{ {
@ -550,7 +562,6 @@ public class HttpInput extends ServletInputStream implements Runnable
} }
} }
@Override @Override
public boolean isReady() public boolean isReady()
{ {

View File

@ -120,7 +120,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private static Logger LOG = Log.getLogger(HttpOutput.class); private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel _channel; private final HttpChannel _channel;
private final SharedBlockingCallback _writeBlock; private final SharedBlockingCallback _writeBlocker;
private Interceptor _interceptor; private Interceptor _interceptor;
/** /**
@ -155,19 +155,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_channel = channel; _channel = channel;
_interceptor = channel; _interceptor = channel;
_writeBlock = new SharedBlockingCallback() _writeBlocker = new WriteBlocker(channel);
{
@Override
protected long getIdleTimeout()
{
long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout();
if (bto > 0)
return bto;
if (bto < 0)
return -1;
return _channel.getIdleTimeout();
}
};
HttpConfiguration config = channel.getHttpConfiguration(); HttpConfiguration config = channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize(); _bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize(); _commitSize = config.getOutputAggregationSize();
@ -221,12 +209,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
protected Blocker acquireWriteBlockingCallback() throws IOException protected Blocker acquireWriteBlockingCallback() throws IOException
{ {
return _writeBlock.acquire(); return _writeBlocker.acquire();
} }
private void write(ByteBuffer content, boolean complete) throws IOException private void write(ByteBuffer content, boolean complete) throws IOException
{ {
try (Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
write(content, complete, blocker); write(content, complete, blocker);
blocker.block(); blocker.block();
@ -670,7 +658,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(InputStream in) throws IOException public void sendContent(InputStream in) throws IOException
{ {
try (Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
new InputStreamWritingCB(in, blocker).iterate(); new InputStreamWritingCB(in, blocker).iterate();
blocker.block(); blocker.block();
@ -692,7 +680,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(ReadableByteChannel in) throws IOException public void sendContent(ReadableByteChannel in) throws IOException
{ {
try (Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
new ReadableByteChannelWritingCB(in, blocker).iterate(); new ReadableByteChannelWritingCB(in, blocker).iterate();
blocker.block(); blocker.block();
@ -714,7 +702,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(HttpContent content) throws IOException public void sendContent(HttpContent content) throws IOException
{ {
try (Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
sendContent(content, blocker); sendContent(content, blocker);
blocker.block(); blocker.block();
@ -1325,4 +1313,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
super.onCompleteFailure(x); super.onCompleteFailure(x);
} }
} }
private static class WriteBlocker extends SharedBlockingCallback
{
private final HttpChannel _channel;
private WriteBlocker(HttpChannel channel)
{
_channel = channel;
}
@Override
protected long getIdleTimeout()
{
long blockingTimeout = _channel.getHttpConfiguration().getBlockingTimeout();
if (blockingTimeout == 0)
return _channel.getIdleTimeout();
return blockingTimeout;
}
}
} }

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -740,189 +738,6 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
Assert.assertTrue(offset > 0); Assert.assertTrue(offset > 0);
} }
@Test(timeout=60000)
public void testMaxIdleWithDelayedDispatch() throws Exception
{
configureServer(new EchoHandler());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
String content="Wibble";
byte[] contentB=content.getBytes("utf-8");
os.write((
"POST /echo HTTP/1.1\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: "+contentB.length+"\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
long start = System.currentTimeMillis();
IO.toString(is);
Thread.sleep(sleepTime);
Assert.assertEquals(-1, is.read());
Assert.assertTrue(System.currentTimeMillis() - start > minimumTestRuntime);
Assert.assertTrue(System.currentTimeMillis() - start < maximumTestRuntime);
}
@Test(timeout=60000)
public void testSlowClientRequestNoLimit() throws Exception
{
configureServer(new EchoHandler());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
os.write((
"POST /echo HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: 20\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
for (int i=0;i<4;i++)
{
os.write("123\n".getBytes("utf-8"));
os.flush();
Thread.sleep(1000);
}
os.write("===\n".getBytes("utf-8"));
os.flush();
String response =IO.toString(is);
Assert.assertThat(response,containsString(" 200 "));
Assert.assertThat(response,containsString("==="));
}
@Test(timeout=60000)
public void testSlowClientRequestLimited() throws Exception
{
_httpConfiguration.setMinRequestDataRate(20);
configureServer(new EchoHandler());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
os.write((
"POST /echo HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: 20\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
try
{
for (int i=0;i<4;i++)
{
os.write("123\n".getBytes("utf-8"));
os.flush();
Thread.sleep(500);
}
os.write("===\n".getBytes("utf-8"));
os.flush();
String response =IO.toString(is);
Assert.assertThat(response,containsString(" 408 "));
Assert.assertThat(response,containsString("Request Data rate"));
}
catch (SocketException e)
{}
}
@Test(timeout=60000)
public void testSlowClientRequestLimitExceeded() throws Exception
{
_httpConfiguration.setMinRequestDataRate(20);
configureServer(new EchoHandler());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
os.write((
"POST /echo HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: 100\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
for (int i=0;i<9;i++)
{
os.write("123456789\n".getBytes("utf-8"));
os.flush();
Thread.sleep(250);
}
os.write("=========\n".getBytes("utf-8"));
os.flush();
String response =IO.toString(is);
Assert.assertThat(response,containsString(" 200 "));
Assert.assertThat(response,containsString("========="));
}
@Test(timeout=60000)
public void testHttpIdleTime() throws Exception
{
_httpConfiguration.setIdleTimeout(500);
configureServer(new EchoHandler());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
try (StacklessLogging scope = new StacklessLogging(HttpChannel.class))
{
os.write((
"POST /echo HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: 20\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
os.write("123456789\n".getBytes("utf-8"));
os.flush();
Thread.sleep(1000);
os.write("=========\n".getBytes("utf-8"));
os.flush();
String response =IO.toString(is);
Assert.assertThat(response,containsString(" 500 "));
Assert.assertThat(response,containsString("/500 ms"));
Assert.assertThat(response,Matchers.not(containsString("=========")));
}
}
protected static class SlowResponseHandler extends AbstractHandler protected static class SlowResponseHandler extends AbstractHandler
{ {
@Override @Override

View File

@ -18,20 +18,30 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Locale; import java.util.Locale;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.server.session.SessionHandler;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertTrue;
public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest
{ {
@Before @Before
@ -113,4 +123,50 @@ public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest
return response; return response;
} }
} }
@Test
public void testHttpWriteIdleTimeout() throws Exception
{
_httpConfiguration.setBlockingTimeout(500);
configureServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
try (StacklessLogging scope = new StacklessLogging(HttpChannel.class))
{
os.write((
"POST /echo HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: 20\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
os.write("123456789\n".getBytes("utf-8"));
os.flush();
Thread.sleep(1000);
os.write("=========\n".getBytes("utf-8"));
os.flush();
Thread.sleep(2000);
String response =IO.toString(is);
Assert.assertThat(response,containsString(" 500 "));
Assert.assertThat(response,containsString("/500 ms"));
Assert.assertThat(response, Matchers.not(containsString("=========")));
}
}
} }

View File

@ -1,3 +1,3 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=INFO org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.server.LEVEL=DEBUG #org.eclipse.jetty.server.LEVEL=DEBUG

View File

@ -86,8 +86,8 @@ public class SharedBlockingCallback
public Blocker acquire() throws IOException public Blocker acquire() throws IOException
{ {
_lock.lock();
long idle = getIdleTimeout(); long idle = getIdleTimeout();
_lock.lock();
try try
{ {
while (_blocker._state != IDLE) while (_blocker._state != IDLE)
@ -102,8 +102,9 @@ public class SharedBlockingCallback
_idle.await(); _idle.await();
} }
_blocker._state = null; _blocker._state = null;
return _blocker;
} }
catch (final InterruptedException e) catch (InterruptedException x)
{ {
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
@ -111,7 +112,6 @@ public class SharedBlockingCallback
{ {
_lock.unlock(); _lock.unlock();
} }
return _blocker;
} }
protected void notComplete(Blocker blocker) protected void notComplete(Blocker blocker)
@ -173,8 +173,15 @@ public class SharedBlockingCallback
_state=cause; _state=cause;
_complete.signalAll(); _complete.signalAll();
} }
else if (_state instanceof BlockerTimeoutException)
{
// Failure arrived late, block() already
// modified the state, nothing more to do.
}
else else
{
throw new IllegalStateException(_state); throw new IllegalStateException(_state);
}
} }
finally finally
{ {
@ -191,19 +198,24 @@ public class SharedBlockingCallback
*/ */
public void block() throws IOException public void block() throws IOException
{ {
_lock.lock();
long idle = getIdleTimeout(); long idle = getIdleTimeout();
_lock.lock();
try try
{ {
while (_state == null) while (_state == null)
{ {
if (idle>0 && (idle < Long.MAX_VALUE/2)) if (idle > 0)
{ {
// Wait a little bit longer than expected callback idle timeout // Waiting here may compete with the idle timeout mechanism,
if (!_complete.await(idle+idle/2,TimeUnit.MILLISECONDS)) // so here we wait a little bit longer to favor the normal
// The callback has not arrived in sufficient time. // idle timeout mechanism that will call failed(Throwable).
// We will synthesize a TimeoutException long excess = Math.min(idle / 2, 1000);
_state=new BlockerTimeoutException(); if (!_complete.await(idle + excess, TimeUnit.MILLISECONDS))
{
// Method failed(Throwable) has not been called yet,
// so we will synthesize a special TimeoutException.
_state = new BlockerTimeoutException();
}
} }
else else
{ {

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Assume;
import org.junit.Rule; import org.junit.Rule;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -61,6 +62,7 @@ public abstract class AbstractTest
@Rule @Rule
public final TestTracker tracker = new TestTracker(); public final TestTracker tracker = new TestTracker();
protected final HttpConfiguration httpConfig = new HttpConfiguration();
protected final Transport transport; protected final Transport transport;
protected SslContextFactory sslContextFactory; protected SslContextFactory sslContextFactory;
protected Server server; protected Server server;
@ -69,6 +71,7 @@ public abstract class AbstractTest
public AbstractTest(Transport transport) public AbstractTest(Transport transport)
{ {
Assume.assumeNotNull(transport);
this.transport = transport; this.transport = transport;
} }
@ -118,14 +121,13 @@ public abstract class AbstractTest
{ {
case HTTP: case HTTP:
{ {
result.add(new HttpConnectionFactory(new HttpConfiguration())); result.add(new HttpConnectionFactory(httpConfig));
break; break;
} }
case HTTPS: case HTTPS:
{ {
HttpConfiguration configuration = new HttpConfiguration(); httpConfig.addCustomizer(new SecureRequestCustomizer());
configuration.addCustomizer(new SecureRequestCustomizer()); HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
HttpConnectionFactory http = new HttpConnectionFactory(configuration);
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, http.getProtocol()); SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, http.getProtocol());
result.add(ssl); result.add(ssl);
result.add(http); result.add(http);
@ -133,14 +135,13 @@ public abstract class AbstractTest
} }
case H2C: case H2C:
{ {
result.add(new HTTP2CServerConnectionFactory(new HttpConfiguration())); result.add(new HTTP2CServerConnectionFactory(httpConfig));
break; break;
} }
case H2: case H2:
{ {
HttpConfiguration configuration = new HttpConfiguration(); httpConfig.addCustomizer(new SecureRequestCustomizer());
configuration.addCustomizer(new SecureRequestCustomizer()); HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(httpConfig);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(configuration);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory("h2"); ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory("h2");
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol()); SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol());
result.add(ssl); result.add(ssl);
@ -150,7 +151,7 @@ public abstract class AbstractTest
} }
case FCGI: case FCGI:
{ {
result.add(new ServerFCGIConnectionFactory(new HttpConfiguration())); result.add(new ServerFCGIConnectionFactory(httpConfig));
break; break;
} }
default: default:

View File

@ -0,0 +1,732 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.junit.Assert;
import org.junit.Test;
public class ServerTimeoutsTest extends AbstractTest
{
public ServerTimeoutsTest(Transport transport)
{
// Skip FCGI for now, not much interested in its server-side behavior.
super(transport == Transport.FCGI ? null : transport);
}
private void setServerIdleTimeout(long idleTimeout)
{
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
h2.setStreamIdleTimeout(idleTimeout);
else
connector.setIdleTimeout(idleTimeout);
}
@Test
public void testDelayedDispatchRequestWithDelayedFirstContentIdleTimeoutFires() throws Exception
{
httpConfig.setDelayDispatchUntilContent(true);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
handlerLatch.countDown();
}
});
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(new DeferredContentProvider())
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// We did not send the content, the request was not
// dispatched, the server should have idle timed out.
Assert.assertFalse(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testNoBlockingTimeoutBlockingReadIdleTimeoutFires() throws Exception
{
httpConfig.setBlockingTimeout(-1);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutSmallerThanIdleTimeoutBlockingReadBlockingTimeoutFires() throws Exception
{
long blockingTimeout = 1000;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
long idleTimeout = 3 * blockingTimeout;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * blockingTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutLargerThanIdleTimeoutBlockingReadIdleTimeoutFires() throws Exception
{
long idleTimeout = 1000;
long blockingTimeout = 3 * idleTimeout;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testNoBlockingTimeoutBlockingWriteIdleTimeoutFires() throws Exception
{
httpConfig.setBlockingTimeout(-1);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingWriteHandler(handlerLatch));
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Blocking write should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutSmallerThanIdleTimeoutBlockingWriteBlockingTimeoutFires() throws Exception
{
long blockingTimeout = 1000;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingWriteHandler(handlerLatch));
long idleTimeout = 3 * blockingTimeout;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Blocking write should timeout.
Assert.assertTrue(handlerLatch.await(2 * blockingTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutLargerThanIdleTimeoutBlockingWriteIdleTimeoutFires() throws Exception
{
long idleTimeout = 1000;
long blockingTimeout = 3 * idleTimeout;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingWriteHandler(handlerLatch));
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutWithSlowRead() throws Exception
{
long idleTimeout = 1000;
long blockingTimeout = 2 * idleTimeout;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
catch (IOException x)
{
handlerLatch.countDown();
throw x;
}
}
});
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.content(contentProvider)
.send(result ->
{
// Result may fail to send the whole request body,
// but the response has arrived successfully.
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// The writes should be slow but not trigger the idle timeout.
long period = idleTimeout / 2;
long writes = 2 * (blockingTimeout / period);
for (long i = 0; i < writes; ++i)
{
contentProvider.offer(ByteBuffer.allocate(1));
Thread.sleep(period);
}
contentProvider.close();
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testAsyncReadIdleTimeoutFires() throws Exception
{
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
Assert.assertEquals(0, input.read());
Assert.assertFalse(input.isReady());
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable failure)
{
if (failure instanceof TimeoutException)
{
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
asyncContext.complete();
handlerLatch.countDown();
}
}
});
}
});
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Async read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testAsyncWriteIdleTimeoutFires() throws Exception
{
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletOutputStream output = response.getOutputStream();
output.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
output.write(new byte[64 * 1024 * 1024]);
}
@Override
public void onError(Throwable failure)
{
if (failure instanceof TimeoutException)
{
asyncContext.complete();
handlerLatch.countDown();
}
}
});
}
});
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Async write should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingReadWithMinimumDataRateBelowLimit() throws Exception
{
int bytesPerSecond = 20;
httpConfig.setMinRequestDataRate(bytesPerSecond);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
catch (BadMessageException x)
{
handlerLatch.countDown();
throw x;
}
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.REQUEST_TIMEOUT_408)
resultLatch.countDown();
});
for (int i = 0; i < 3; ++i)
{
contentProvider.offer(ByteBuffer.allocate(bytesPerSecond / 2));
Thread.sleep(1000);
}
contentProvider.close();
// Request should timeout.
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingReadWithMinimumDataRateAboveLimit() throws Exception
{
int bytesPerSecond = 20;
httpConfig.setMinRequestDataRate(bytesPerSecond);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
handlerLatch.countDown();
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
resultLatch.countDown();
});
for (int i = 0; i < 3; ++i)
{
contentProvider.offer(ByteBuffer.allocate(bytesPerSecond * 2));
Thread.sleep(1000);
}
contentProvider.close();
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingReadHttpIdleTimeoutOverridesIdleTimeout() throws Exception
{
long httpIdleTimeout = 1000;
long idleTimeout = 3 * httpIdleTimeout;
httpConfig.setIdleTimeout(httpIdleTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * httpIdleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testAsyncReadHttpIdleTimeoutOverridesIdleTimeout() throws Exception
{
long httpIdleTimeout = 1000;
long idleTimeout = 3 * httpIdleTimeout;
httpConfig.setIdleTimeout(httpIdleTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
Assert.assertEquals(0, input.read());
Assert.assertFalse(input.isReady());
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable failure)
{
if (failure instanceof TimeoutException)
{
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
asyncContext.complete();
handlerLatch.countDown();
}
}
});
}
});
setServerIdleTimeout(idleTimeout);
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Async read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
private static class BlockingReadHandler extends AbstractHandler
{
private final CountDownLatch handlerLatch;
public BlockingReadHandler(CountDownLatch handlerLatch)
{
this.handlerLatch = handlerLatch;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
Assert.assertEquals(0, input.read());
try
{
input.read();
}
catch (IOException x)
{
handlerLatch.countDown();
throw x;
}
}
}
private static class BlockingWriteHandler extends AbstractHandler
{
private final CountDownLatch handlerLatch;
private BlockingWriteHandler(CountDownLatch handlerLatch)
{
this.handlerLatch = handlerLatch;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletOutputStream output = response.getOutputStream();
try
{
output.write(new byte[64 * 1024 * 1024]);
}
catch (IOException x)
{
handlerLatch.countDown();
throw x;
}
}
}
}