Merged branch 'jetty-9.3.x-845'.

This commit is contained in:
Simone Bordet 2016-09-05 21:36:04 +02:00
commit 89b20b4692
19 changed files with 1371 additions and 349 deletions

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.http; package org.eclipse.jetty.http;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;

View File

@ -50,6 +50,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0; private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private ExecutionStrategy.Factory executionStrategyFactory = new ProduceExecuteConsume.Factory(); private ExecutionStrategy.Factory executionStrategyFactory = new ProduceExecuteConsume.Factory();
private long streamIdleTimeout;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{ {
@ -157,6 +158,17 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.executionStrategyFactory = executionStrategyFactory; this.executionStrategyFactory = executionStrategyFactory;
} }
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}
public HttpConfiguration getHttpConfiguration() public HttpConfiguration getHttpConfiguration()
{ {
return httpConfiguration; return httpConfiguration;
@ -177,8 +189,11 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
// For a single stream in a connection, there will be a race between // For a single stream in a connection, there will be a race between
// the stream idle timeout and the connection idle timeout. However, // the stream idle timeout and the connection idle timeout. However,
// the typical case is that the connection will be busier and the // the typical case is that the connection will be busier and the
// stream idle timeout will expire earlier that the connection's. // stream idle timeout will expire earlier than the connection's.
session.setStreamIdleTimeout(endPoint.getIdleTimeout()); long streamIdleTimeout = getStreamIdleTimeout();
if (streamIdleTimeout <= 0)
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow()); session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());
ServerParser parser = newServerParser(connector, session); ServerParser parser = newServerParser(connector, session);

View File

@ -133,9 +133,9 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
public boolean onStreamTimeout(IStream stream, Throwable failure) public boolean onStreamTimeout(IStream stream, Throwable failure)
{ {
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = !channel.isRequestHandled(); boolean result = channel.onStreamTimeout(failure);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", stream, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
return result; return result;
} }
@ -157,7 +157,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
result &= !channel.isRequestHandled(); result &= !channel.isRequestHandled();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", session, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
return result; return result;
} }

View File

@ -71,6 +71,18 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return _expect100Continue; return _expect100Continue;
} }
@Override
public void setIdleTimeout(long timeoutMs)
{
getStream().setIdleTimeout(timeoutMs);
}
@Override
public long getIdleTimeout()
{
return getStream().getIdleTimeout();
}
public Runnable onRequest(HeadersFrame frame) public Runnable onRequest(HeadersFrame frame)
{ {
try try
@ -255,11 +267,11 @@ public class HttpChannelOverHTTP2 extends HttpChannel
handle); handle);
} }
boolean delayed = _delayedUntilContent; boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false; _delayedUntilContent = false;
if (delayed) if (wasDelayed)
_handled = true; _handled = true;
return handle || delayed ? this : null; return handle || wasDelayed ? this : null;
} }
public boolean isRequestHandled() public boolean isRequestHandled()
@ -267,6 +279,21 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return _handled; return _handled;
} }
public boolean onStreamTimeout(Throwable failure)
{
if (!_handled)
return true;
HttpInput input = getRequest().getHttpInput();
boolean readFailed = input.failed(failure);
if (readFailed)
handle();
boolean writeFailed = getHttpTransport().onStreamTimeout(failure);
return readFailed || writeFailed;
}
public void onFailure(Throwable failure) public void onFailure(Throwable failure)
{ {
onEarlyEOF(); onEarlyEOF();

View File

@ -43,7 +43,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class); private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class);
private final AtomicBoolean commit = new AtomicBoolean(); private final AtomicBoolean commit = new AtomicBoolean();
private final Callback commitCallback = new CommitCallback(); private final TransportCallback transportCallback = new TransportCallback();
private final Connector connector; private final Connector connector;
private final HTTP2ServerConnection connection; private final HTTP2ServerConnection connection;
private IStream stream; private IStream stream;
@ -100,12 +100,22 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{ {
if (hasContent) if (hasContent)
{ {
commit(info, false, commitCallback); Callback commitCallback = new Callback.Nested(callback)
send(content, lastContent, callback); {
@Override
public void succeeded()
{
if (transportCallback.start(callback, false))
send(content, lastContent, transportCallback);
}
};
if (transportCallback.start(commitCallback, true))
commit(info, false, transportCallback);
} }
else else
{ {
commit(info, lastContent, callback); if (transportCallback.start(callback, false))
commit(info, lastContent, transportCallback);
} }
} }
else else
@ -117,7 +127,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{ {
if (hasContent || lastContent) if (hasContent || lastContent)
{ {
send(content, lastContent, callback); if (transportCallback.start(callback, false))
send(content, lastContent, transportCallback);
} }
else else
{ {
@ -186,6 +197,11 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.data(frame, callback); stream.data(frame, callback);
} }
public boolean onStreamTimeout(Throwable failure)
{
return transportCallback.onIdleTimeout(failure);
}
@Override @Override
public void onCompleted() public void onCompleted()
{ {
@ -214,20 +230,99 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP); stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
} }
private class CommitCallback implements Callback.NonBlocking private class TransportCallback implements Callback
{ {
private State state = State.IDLE;
private Callback callback;
private boolean commit;
public boolean start(Callback callback, boolean commit)
{
State state;
synchronized (this)
{
state = this.state;
if (state == State.IDLE)
{
this.state = State.WRITING;
this.callback = callback;
this.commit = commit;
return true;
}
}
callback.failed(new IllegalStateException("Invalid transport state: " + state));
return false;
}
@Override @Override
public void succeeded() public void succeeded()
{ {
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
{
callback = this.callback;
this.state = State.IDLE;
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} committed", stream.getId()); LOG.debug("HTTP2 Response #{} {}", stream.getId(), commit ? "committed" : "flushed content");
if (callback != null)
callback.succeeded();
} }
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
{
callback = this.callback;
this.state = State.FAILED;
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x); LOG.debug("HTTP2 Response #" + stream.getId() + " failed to " + (commit ? "commit" : "flush"), x);
if (callback != null)
callback.failed(x);
}
@Override
public boolean isNonBlocking()
{
return callback.isNonBlocking();
}
private boolean onIdleTimeout(Throwable failure)
{
boolean result;
Callback callback = null;
synchronized (this)
{
result = state == State.WRITING;
if (result)
{
callback = this.callback;
this.state = State.TIMEOUT;
}
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " idle timeout", failure);
if (result)
callback.failed(failure);
return result;
} }
} }
private enum State
{
IDLE, WRITING, FAILED, TIMEOUT
}
} }

View File

@ -24,6 +24,8 @@ import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel; import java.nio.channels.ByteChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;

View File

@ -80,6 +80,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
private final Response _response; private final Response _response;
private MetaData.Response _committedMetaData; private MetaData.Response _committedMetaData;
private RequestLog _requestLog; private RequestLog _requestLog;
private long _oldIdleTimeout;
/** Bytes written after interception (eg after compression) */ /** Bytes written after interception (eg after compression) */
private long _written; private long _written;
@ -596,6 +597,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (_configuration.getSendDateHeader() && !fields.contains(HttpHeader.DATE)) if (_configuration.getSendDateHeader() && !fields.contains(HttpHeader.DATE))
fields.put(_connector.getServer().getDateField()); fields.put(_connector.getServer().getDateField());
long idleTO=_configuration.getIdleTimeout();
_oldIdleTimeout=getIdleTimeout();
if (idleTO>=0 && _oldIdleTimeout!=idleTO)
setIdleTimeout(idleTO);
_request.setMetaData(request); _request.setMetaData(request);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -627,6 +633,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (_requestLog!=null ) if (_requestLog!=null )
_requestLog.log(_request, _response); _requestLog.log(_request, _response);
long idleTO=_configuration.getIdleTimeout();
if (idleTO>=0 && getIdleTimeout()!=_oldIdleTimeout)
setIdleTimeout(_oldIdleTimeout);
_transport.onCompleted(); _transport.onCompleted();
} }

View File

@ -419,8 +419,8 @@ public class HttpChannelState
_state=State.ASYNC_WAIT; _state=State.ASYNC_WAIT;
action=Action.WAIT; action=Action.WAIT;
if (_asyncReadUnready) if (_asyncReadUnready)
_channel.asyncReadFillInterested(); read_interested=true;
Scheduler scheduler = _channel.getScheduler(); Scheduler scheduler=_channel.getScheduler();
if (scheduler!=null && _timeoutMs>0) if (scheduler!=null && _timeoutMs>0)
_event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS)); _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));
} }
@ -454,6 +454,9 @@ public class HttpChannelState
} }
} }
if (read_interested)
_channel.asyncReadFillInterested();
return action; return action;
} }

View File

@ -56,6 +56,7 @@ public class HttpConfiguration
private int _responseHeaderSize=8*1024; private int _responseHeaderSize=8*1024;
private int _headerCacheSize=512; private int _headerCacheSize=512;
private int _securePort; private int _securePort;
private long _idleTimeout=-1;
private long _blockingTimeout=-1; private long _blockingTimeout=-1;
private String _secureScheme = HttpScheme.HTTPS.asString(); private String _secureScheme = HttpScheme.HTTPS.asString();
private boolean _sendServerVersion = true; private boolean _sendServerVersion = true;
@ -64,6 +65,7 @@ public class HttpConfiguration
private boolean _delayDispatchUntilContent = true; private boolean _delayDispatchUntilContent = true;
private boolean _persistentConnectionsEnabled = true; private boolean _persistentConnectionsEnabled = true;
private int _maxErrorDispatches = 10; private int _maxErrorDispatches = 10;
private long _minRequestDataRate;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -113,6 +115,7 @@ public class HttpConfiguration
_headerCacheSize=config._headerCacheSize; _headerCacheSize=config._headerCacheSize;
_secureScheme=config._secureScheme; _secureScheme=config._secureScheme;
_securePort=config._securePort; _securePort=config._securePort;
_idleTimeout=config._idleTimeout;
_blockingTimeout=config._blockingTimeout; _blockingTimeout=config._blockingTimeout;
_sendDateHeader=config._sendDateHeader; _sendDateHeader=config._sendDateHeader;
_sendServerVersion=config._sendServerVersion; _sendServerVersion=config._sendServerVersion;
@ -206,6 +209,31 @@ public class HttpConfiguration
return _persistentConnectionsEnabled; return _persistentConnectionsEnabled;
} }
/* ------------------------------------------------------------ */
/** Get the max idle time in ms.
* <p>The max idle time is applied to a HTTP request for IO operations and
* delayed dispatch.
* @return the max idle time in ms or if == 0 implies an infinite timeout, &lt;0
* implies no HTTP channel timeout and the connection timeout is used instead.
*/
public long getIdleTimeout()
{
return _idleTimeout;
}
/* ------------------------------------------------------------ */
/** Set the max idle time in ms.
* <p>The max idle time is applied to a HTTP request for IO operations and
* delayed dispatch.
* @param timeoutMs the max idle time in ms or if == 0 implies an infinite timeout, &lt;0
* implies no HTTP channel timeout and the connection timeout is used instead.
*/
public void setIdleTimeout(long timeoutMs)
{
_idleTimeout=timeoutMs;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Get the timeout applied to blocking operations. /** Get the timeout applied to blocking operations.
* <p>This timeout is in addition to the {@link Connector#getIdleTimeout()}, and applies * <p>This timeout is in addition to the {@link Connector#getIdleTimeout()}, and applies
@ -479,4 +507,22 @@ public class HttpConfiguration
{ {
_maxErrorDispatches=max; _maxErrorDispatches=max;
} }
/* ------------------------------------------------------------ */
/**
* @return The minimum request data rate in bytes per second; or &lt;=0 for no limit
*/
public long getMinRequestDataRate()
{
return _minRequestDataRate;
}
/* ------------------------------------------------------------ */
/**
* @param bytesPerSecond The minimum request data rate in bytes per second; or &lt;=0 for no limit
*/
public void setMinRequestDataRate(long bytesPerSecond)
{
_minRequestDataRate=bytesPerSecond;
}
} }

View File

@ -124,9 +124,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
protected HttpChannelOverHttp newHttpChannel() protected HttpChannelOverHttp newHttpChannel()
{ {
HttpChannelOverHttp httpChannel = new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this); return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
return httpChannel;
} }
protected HttpParser newHttpParser(HttpCompliance compliance) protected HttpParser newHttpParser(HttpCompliance compliance)
@ -285,9 +283,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
while (_parser.inContentState()) while (_parser.inContentState())
{ {
int filled = fillRequestBuffer(); int filled = fillRequestBuffer();
boolean handle = parseRequestBuffer(); handled = parseRequestBuffer();
handled|=handle; if (handled || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
break; break;
} }
return handled; return handled;

View File

@ -25,11 +25,14 @@ import java.util.ArrayDeque;
import java.util.Deque; import java.util.Deque;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -56,14 +59,14 @@ public class HttpInput extends ServletInputStream implements Runnable
private final HttpChannelState _channelState; private final HttpChannelState _channelState;
private ReadListener _listener; private ReadListener _listener;
private State _state = STREAM; private State _state = STREAM;
private long _firstByteTimeStamp = -1;
private long _contentArrived;
private long _contentConsumed; private long _contentConsumed;
private long _blockingTimeoutAt = -1; private long _blockUntil;
public HttpInput(HttpChannelState state) public HttpInput(HttpChannelState state)
{ {
_channelState=state; _channelState = state;
if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0)
_blockingTimeoutAt=0;
} }
protected HttpChannelState getHttpChannelState() protected HttpChannelState getHttpChannelState()
@ -83,33 +86,36 @@ public class HttpInput extends ServletInputStream implements Runnable
} }
_listener = null; _listener = null;
_state = STREAM; _state = STREAM;
_contentArrived = 0;
_contentConsumed = 0; _contentConsumed = 0;
_firstByteTimeStamp = -1;
_blockUntil = 0;
} }
} }
@Override @Override
public int available() public int available()
{ {
int available=0; int available = 0;
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
Content content = _inputQ.peek(); Content content = _inputQ.peek();
if (content==null) if (content == null)
{ {
try try
{ {
produceContent(); produceContent();
} }
catch(IOException e) catch (IOException e)
{ {
woken=failed(e); woken = failed(e);
} }
content = _inputQ.peek(); content = _inputQ.peek();
} }
if (content!=null) if (content != null)
available= remaining(content); available = remaining(content);
} }
if (woken) if (woken)
@ -124,12 +130,16 @@ public class HttpInput extends ServletInputStream implements Runnable
executor.execute(channel); executor.execute(channel);
} }
private long getBlockingTimeout()
{
return getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
}
@Override @Override
public int read() throws IOException public int read() throws IOException
{ {
int read = read(_oneByteBuffer, 0, 1); int read = read(_oneByteBuffer, 0, 1);
if (read==0) if (read == 0)
throw new IllegalStateException("unready read=0"); throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
} }
@ -139,17 +149,36 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_blockingTimeoutAt>=0 && !isAsync()) if (!isAsync())
_blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); {
if (_blockUntil == 0)
{
long blockingTimeout = getBlockingTimeout();
if (blockingTimeout > 0)
_blockUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(blockingTimeout);
}
}
while(true) long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate();
if (minRequestDataRate > 0 && _firstByteTimeStamp != -1)
{
long period = System.nanoTime() - _firstByteTimeStamp;
if (period > 0)
{
long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentArrived < minimum_data)
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, String.format("Request data rate < %d B/s", minRequestDataRate));
}
}
while (true)
{ {
Content item = nextContent(); Content item = nextContent();
if (item!=null) if (item != null)
{ {
int l = get(item, b, off, len); int l = get(item, b, off, len);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} read {} from {}",this,l,item); LOG.debug("{} read {} from {}", this, l, item);
consumeNonContent(); consumeNonContent();
@ -167,6 +196,7 @@ public class HttpInput extends ServletInputStream implements Runnable
* produce more Content and add it via {@link #addContent(Content)}. * produce more Content and add it via {@link #addContent(Content)}.
* For protocols that are constantly producing (eg HTTP2) this can * For protocols that are constantly producing (eg HTTP2) this can
* be left as a noop; * be left as a noop;
*
* @throws IOException if unable to produce content * @throws IOException if unable to produce content
*/ */
protected void produceContent() throws IOException protected void produceContent() throws IOException
@ -183,7 +213,7 @@ public class HttpInput extends ServletInputStream implements Runnable
protected Content nextContent() throws IOException protected Content nextContent() throws IOException
{ {
Content content = pollContent(); Content content = pollContent();
if (content==null && !isFinished()) if (content == null && !isFinished())
{ {
produceContent(); produceContent();
content = pollContent(); content = pollContent();
@ -191,9 +221,11 @@ public class HttpInput extends ServletInputStream implements Runnable
return content; return content;
} }
/** Poll the inputQ for Content. /**
* Poll the inputQ for Content.
* Consumed buffers and {@link PoisonPillContent}s are removed and * Consumed buffers and {@link PoisonPillContent}s are removed and
* EOF state updated if need be. * EOF state updated if need be.
*
* @return Content or null * @return Content or null
*/ */
protected Content pollContent() protected Content pollContent()
@ -208,20 +240,20 @@ public class HttpInput extends ServletInputStream implements Runnable
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content); LOG.debug("{} consumed {}", this, content);
if (content==EOF_CONTENT) if (content == EOF_CONTENT)
{ {
if (_listener==null) if (_listener == null)
_state=EOF; _state = EOF;
else else
{ {
_state=AEOF; _state = AEOF;
boolean woken = _channelState.onReadReady(); // force callback? boolean woken = _channelState.onReadReady(); // force callback?
if (woken) if (woken)
wake(); wake();
} }
} }
else if (content==EARLY_EOF_CONTENT) else if (content == EARLY_EOF_CONTENT)
_state=EARLY_EOF; _state = EARLY_EOF;
content = _inputQ.peek(); content = _inputQ.peek();
} }
@ -261,7 +293,7 @@ public class HttpInput extends ServletInputStream implements Runnable
protected Content nextReadable() throws IOException protected Content nextReadable() throws IOException
{ {
Content content = pollReadable(); Content content = pollReadable();
if (content==null && !isFinished()) if (content == null && !isFinished())
{ {
produceContent(); produceContent();
content = pollReadable(); content = pollReadable();
@ -269,9 +301,11 @@ public class HttpInput extends ServletInputStream implements Runnable
return content; return content;
} }
/** Poll the inputQ for Content or EOF. /**
* Poll the inputQ for Content or EOF.
* Consumed buffers and non EOF {@link PoisonPillContent}s are removed. * Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
* EOF state is not updated. * EOF state is not updated.
*
* @return Content, EOF or null * @return Content, EOF or null
*/ */
protected Content pollReadable() protected Content pollReadable()
@ -282,7 +316,7 @@ public class HttpInput extends ServletInputStream implements Runnable
// Skip consumed items at the head of the queue except EOF // Skip consumed items at the head of the queue except EOF
while (content != null) while (content != null)
{ {
if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0) if (content == EOF_CONTENT || content == EARLY_EOF_CONTENT || remaining(content) > 0)
return content; return content;
_inputQ.poll(); _inputQ.poll();
@ -307,17 +341,17 @@ public class HttpInput extends ServletInputStream implements Runnable
/** /**
* Copies the given content into the given byte buffer. * Copies the given content into the given byte buffer.
* *
* @param content the content to copy from * @param content the content to copy from
* @param buffer the buffer to copy into * @param buffer the buffer to copy into
* @param offset the buffer offset to start copying from * @param offset the buffer offset to start copying from
* @param length the space available in the buffer * @param length the space available in the buffer
* @return the number of bytes actually copied * @return the number of bytes actually copied
*/ */
protected int get(Content content, byte[] buffer, int offset, int length) protected int get(Content content, byte[] buffer, int offset, int length)
{ {
int l = Math.min(content.remaining(), length); int l = Math.min(content.remaining(), length);
content.getContent().get(buffer, offset, l); content.getContent().get(buffer, offset, l);
_contentConsumed+=l; _contentConsumed += l;
return l; return l;
} }
@ -325,16 +359,16 @@ public class HttpInput extends ServletInputStream implements Runnable
* Consumes the given content. * Consumes the given content.
* Calls the content succeeded if all content consumed. * Calls the content succeeded if all content consumed.
* *
* @param content the content to consume * @param content the content to consume
* @param length the number of bytes to consume * @param length the number of bytes to consume
*/ */
protected void skip(Content content, int length) protected void skip(Content content, int length)
{ {
int l = Math.min(content.remaining(), length); int l = Math.min(content.remaining(), length);
ByteBuffer buffer = content.getContent(); ByteBuffer buffer = content.getContent();
buffer.position(buffer.position()+l); buffer.position(buffer.position() + l);
_contentConsumed+=l; _contentConsumed += l;
if (l>0 && !content.hasContent()) if (l > 0 && !content.hasContent())
pollContent(); // hungry succeed pollContent(); // hungry succeed
} }
@ -348,23 +382,26 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
try try
{ {
long timeout=0; long timeout = 0;
if (_blockingTimeoutAt>=0) if (_blockUntil != 0)
{ {
timeout=_blockingTimeoutAt-System.currentTimeMillis(); timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime());
if (timeout<=0) if (timeout <= 0)
throw new TimeoutException(); throw new TimeoutException();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content timeout={}", this,timeout); LOG.debug("{} blocking for content timeout={}", this, timeout);
if (timeout>0) if (timeout > 0)
_inputQ.wait(timeout); _inputQ.wait(timeout);
else else
_inputQ.wait(); _inputQ.wait();
if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt) // TODO: cannot return unless there is content or timeout,
throw new TimeoutException(); // TODO: so spurious wakeups are not handled correctly.
if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0)
throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
} }
catch (Throwable e) catch (Throwable e)
{ {
@ -377,23 +414,24 @@ public class HttpInput extends ServletInputStream implements Runnable
* <p>Typically used to push back content that has * <p>Typically used to push back content that has
* been read, perhaps mutated. The bytes prepended are * been read, perhaps mutated. The bytes prepended are
* deducted for the contentConsumed total</p> * deducted for the contentConsumed total</p>
*
* @param item the content to add * @param item the content to add
* @return true if content channel woken for read * @return true if content channel woken for read
*/ */
public boolean prependContent(Content item) public boolean prependContent(Content item)
{ {
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
_inputQ.push(item); _inputQ.push(item);
_contentConsumed-=item.remaining(); _contentConsumed -= item.remaining();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} prependContent {}", this, item); LOG.debug("{} prependContent {}", this, item);
if (_listener==null) if (_listener == null)
_inputQ.notify(); _inputQ.notify();
else else
woken=_channelState.onReadPossible(); woken = _channelState.onReadPossible();
} }
return woken; return woken;
@ -407,17 +445,20 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public boolean addContent(Content item) public boolean addContent(Content item)
{ {
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_firstByteTimeStamp == -1)
_firstByteTimeStamp = System.nanoTime();
_contentArrived += item.remaining();
_inputQ.offer(item); _inputQ.offer(item);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, item); LOG.debug("{} addContent {}", this, item);
if (_listener==null) if (_listener == null)
_inputQ.notify(); _inputQ.notify();
else else
woken=_channelState.onReadPossible(); woken = _channelState.onReadPossible();
} }
return woken; return woken;
@ -427,7 +468,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
return _inputQ.size()>0; return _inputQ.size() > 0;
} }
} }
@ -453,6 +494,7 @@ public class HttpInput extends ServletInputStream implements Runnable
* <p> * <p>
* Typically this will result in an EOFException being thrown * Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return. * from a subsequent read rather than a -1 return.
*
* @return true if content channel woken for read * @return true if content channel woken for read
*/ */
public boolean earlyEOF() public boolean earlyEOF()
@ -463,11 +505,12 @@ public class HttpInput extends ServletInputStream implements Runnable
/** /**
* This method should be called to signal that all the expected * This method should be called to signal that all the expected
* content arrived. * content arrived.
*
* @return true if content channel woken for read * @return true if content channel woken for read
*/ */
public boolean eof() public boolean eof()
{ {
return addContent(EOF_CONTENT); return addContent(EOF_CONTENT);
} }
public boolean consumeAll() public boolean consumeAll()
@ -506,7 +549,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
return _state==ASYNC; return _state == ASYNC;
} }
} }
@ -519,7 +562,6 @@ public class HttpInput extends ServletInputStream implements Runnable
} }
} }
@Override @Override
public boolean isReady() public boolean isReady()
{ {
@ -527,18 +569,18 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_listener == null ) if (_listener == null)
return true; return true;
if (_state instanceof EOFState) if (_state instanceof EOFState)
return true; return true;
if (nextReadable()!=null) if (nextReadable() != null)
return true; return true;
_channelState.onReadUnready(); _channelState.onReadUnready();
} }
return false; return false;
} }
catch(IOException e) catch (IOException e)
{ {
LOG.ignore(e); LOG.ignore(e);
return true; return true;
@ -549,7 +591,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public void setReadListener(ReadListener readListener) public void setReadListener(ReadListener readListener)
{ {
readListener = Objects.requireNonNull(readListener); readListener = Objects.requireNonNull(readListener);
boolean woken=false; boolean woken = false;
try try
{ {
synchronized (_inputQ) synchronized (_inputQ)
@ -557,11 +599,11 @@ public class HttpInput extends ServletInputStream implements Runnable
if (_listener != null) if (_listener != null)
throw new IllegalStateException("ReadListener already set"); throw new IllegalStateException("ReadListener already set");
if (_state != STREAM) if (_state != STREAM)
throw new IllegalStateException("State "+STREAM+" != " + _state); throw new IllegalStateException("State " + STREAM + " != " + _state);
_state = ASYNC; _state = ASYNC;
_listener = readListener; _listener = readListener;
boolean content=nextContent()!=null; boolean content = nextContent() != null;
if (content) if (content)
woken = _channelState.onReadReady(); woken = _channelState.onReadReady();
@ -569,7 +611,7 @@ public class HttpInput extends ServletInputStream implements Runnable
_channelState.onReadUnready(); _channelState.onReadUnready();
} }
} }
catch(IOException e) catch (IOException e)
{ {
throw new RuntimeIOException(e); throw new RuntimeIOException(e);
} }
@ -580,7 +622,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean failed(Throwable x) public boolean failed(Throwable x)
{ {
boolean woken=false; boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_state instanceof ErrorState) if (_state instanceof ErrorState)
@ -588,16 +630,15 @@ public class HttpInput extends ServletInputStream implements Runnable
else else
_state = new ErrorState(x); _state = new ErrorState(x);
if (_listener==null) if (_listener == null)
_inputQ.notify(); _inputQ.notify();
else else
woken=_channelState.onReadPossible(); woken = _channelState.onReadPossible();
} }
return woken; return woken;
} }
/* ------------------------------------------------------------ */
/* /*
* <p> * <p>
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
@ -610,26 +651,26 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
final Throwable error; final Throwable error;
final ReadListener listener; final ReadListener listener;
boolean aeof=false; boolean aeof = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_state==EOF) if (_state == EOF)
return; return;
if (_state==AEOF) if (_state == AEOF)
{ {
_state=EOF; _state = EOF;
aeof=true; aeof = true;
} }
listener = _listener; listener = _listener;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null; error = _state instanceof ErrorState ? ((ErrorState)_state).getError() : null;
} }
try try
{ {
if (error!=null) if (error != null)
{ {
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
listener.onError(error); listener.onError(error);
@ -649,7 +690,7 @@ public class HttpInput extends ServletInputStream implements Runnable
LOG.debug(e); LOG.debug(e);
try try
{ {
if (aeof || error==null) if (aeof || error == null)
{ {
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
listener.onError(e); listener.onError(e);
@ -673,10 +714,10 @@ public class HttpInput extends ServletInputStream implements Runnable
Content content; Content content;
synchronized (_inputQ) synchronized (_inputQ)
{ {
state=_state; state = _state;
consumed=_contentConsumed; consumed = _contentConsumed;
q=_inputQ.size(); q = _inputQ.size();
content=_inputQ.peekFirst(); content = _inputQ.peekFirst();
} }
return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]", return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]",
getClass().getSimpleName(), getClass().getSimpleName(),
@ -690,10 +731,11 @@ public class HttpInput extends ServletInputStream implements Runnable
public static class PoisonPillContent extends Content public static class PoisonPillContent extends Content
{ {
private final String _name; private final String _name;
public PoisonPillContent(String name) public PoisonPillContent(String name)
{ {
super(BufferUtil.EMPTY_BUFFER); super(BufferUtil.EMPTY_BUFFER);
_name=name; _name = name;
} }
@Override @Override
@ -717,7 +759,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public Content(ByteBuffer content) public Content(ByteBuffer content)
{ {
_content=content; _content = content;
} }
@Override @Override
@ -745,7 +787,7 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public String toString() public String toString()
{ {
return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content)); return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content));
} }
} }
@ -770,9 +812,10 @@ public class HttpInput extends ServletInputStream implements Runnable
protected class ErrorState extends EOFState protected class ErrorState extends EOFState
{ {
final Throwable _error; final Throwable _error;
ErrorState(Throwable error) ErrorState(Throwable error)
{ {
_error=error; _error = error;
} }
public Throwable getError() public Throwable getError()
@ -791,7 +834,7 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override @Override
public String toString() public String toString()
{ {
return "ERROR:"+_error; return "ERROR:" + _error;
} }
} }

View File

@ -54,75 +54,78 @@ import org.eclipse.jetty.util.log.Logger;
* close the stream, to be reopened after the inclusion ends.</p> * close the stream, to be reopened after the inclusion ends.</p>
*/ */
public class HttpOutput extends ServletOutputStream implements Runnable public class HttpOutput extends ServletOutputStream implements Runnable
{ {
/** /**
* The HttpOutput.Inteceptor is a single intercept point for all * The HttpOutput.Interceptor is a single intercept point for all
* output written to the HttpOutput: via writer; via output stream; * output written to the HttpOutput: via writer; via output stream;
* asynchronously; or blocking. * asynchronously; or blocking.
* <p> * <p>
* The Interceptor can be used to implement translations (eg Gzip) or * The Interceptor can be used to implement translations (eg Gzip) or
* additional buffering that acts on all output. Interceptors are * additional buffering that acts on all output. Interceptors are
* created in a chain, so that multiple concerns may intercept. * created in a chain, so that multiple concerns may intercept.
* <p> * <p>
* The {@link HttpChannel} is an {@link Interceptor} and is always the * The {@link HttpChannel} is an {@link Interceptor} and is always the
* last link in any Interceptor chain. * last link in any Interceptor chain.
* <p> * <p>
* Responses are committed by the first call to * Responses are committed by the first call to
* {@link #write(ByteBuffer, boolean, Callback)} * {@link #write(ByteBuffer, boolean, Callback)}
* and closed by a call to {@link #write(ByteBuffer, boolean, Callback)} * and closed by a call to {@link #write(ByteBuffer, boolean, Callback)}
* with the last boolean set true. If no content is available to commit * with the last boolean set true. If no content is available to commit
* or close, then a null buffer is passed. * or close, then a null buffer is passed.
*/ */
public interface Interceptor public interface Interceptor
{ {
/** /**
* Write content. * Write content.
* The response is committed by the first call to write and is closed by * The response is committed by the first call to write and is closed by
* a call with last == true. Empty content buffers may be passed to * a call with last == true. Empty content buffers may be passed to
* force a commit or close. * force a commit or close.
* @param content The content to be written or an empty buffer. *
* @param last True if this is the last call to write * @param content The content to be written or an empty buffer.
* @param callback The callback to use to indicate {@link Callback#succeeded()} * @param last True if this is the last call to write
* or {@link Callback#failed(Throwable)}. * @param callback The callback to use to indicate {@link Callback#succeeded()}
* or {@link Callback#failed(Throwable)}.
*/ */
void write(ByteBuffer content, boolean last, Callback callback); void write(ByteBuffer content, boolean last, Callback callback);
/** /**
* @return The next Interceptor in the chain or null if this is the * @return The next Interceptor in the chain or null if this is the
* last Interceptor in the chain. * last Interceptor in the chain.
*/ */
Interceptor getNextInterceptor(); Interceptor getNextInterceptor();
/** /**
* @return True if the Interceptor is optimized to receive direct * @return True if the Interceptor is optimized to receive direct
* {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)} * {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)}
* method. If false is returned, then passing direct buffers may cause * method. If false is returned, then passing direct buffers may cause
* inefficiencies. * inefficiencies.
*/ */
boolean isOptimizedForDirectBuffers(); boolean isOptimizedForDirectBuffers();
/** /**
* Reset the buffers. * Reset the buffers.
* <p>If the Interceptor contains buffers then reset them. * <p>If the Interceptor contains buffers then reset them.
* @throws IllegalStateException Thrown if the response has been *
* committed and buffers and/or headers cannot be reset. * @throws IllegalStateException Thrown if the response has been
* committed and buffers and/or headers cannot be reset.
*/ */
default void resetBuffer() throws IllegalStateException default void resetBuffer() throws IllegalStateException
{ {
Interceptor next = getNextInterceptor(); Interceptor next = getNextInterceptor();
if (next!=null) if (next != null)
next.resetBuffer(); next.resetBuffer();
}; }
} }
private static Logger LOG = Log.getLogger(HttpOutput.class); private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel _channel; private final HttpChannel _channel;
private final SharedBlockingCallback _writeBlock; private final SharedBlockingCallback _writeBlocker;
private Interceptor _interceptor; private Interceptor _interceptor;
/** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */ /**
* Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written.
*/
private long _written; private long _written;
private ByteBuffer _aggregate; private ByteBuffer _aggregate;
@ -130,6 +133,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private int _commitSize; private int _commitSize;
private WriteListener _writeListener; private WriteListener _writeListener;
private volatile Throwable _onError; private volatile Throwable _onError;
/* /*
ACTION OPEN ASYNC READY PENDING UNREADY CLOSED ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------
@ -140,33 +144,25 @@ public class HttpOutput extends ServletOutputStream implements Runnable
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
write completed - - - ASYNC READY->owp - write completed - - - ASYNC READY->owp -
*/ */
private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED } private enum OutputState
private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN); {
OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED
}
private final AtomicReference<OutputState> _state = new AtomicReference<>(OutputState.OPEN);
public HttpOutput(HttpChannel channel) public HttpOutput(HttpChannel channel)
{ {
_channel = channel; _channel = channel;
_interceptor = channel; _interceptor = channel;
_writeBlock = new SharedBlockingCallback() _writeBlocker = new WriteBlocker(channel);
{
@Override
protected long getIdleTimeout()
{
long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout();
if (bto>0)
return bto;
if (bto<0)
return -1;
return _channel.getIdleTimeout();
}
};
HttpConfiguration config = channel.getHttpConfiguration(); HttpConfiguration config = channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize(); _bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize(); _commitSize = config.getOutputAggregationSize();
if (_commitSize>_bufferSize) if (_commitSize > _bufferSize)
{ {
LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize); LOG.warn("OutputAggregationSize {} exceeds bufferSize {}", _commitSize, _bufferSize);
_commitSize=_bufferSize; _commitSize = _bufferSize;
} }
} }
@ -182,7 +178,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void setInterceptor(Interceptor filter) public void setInterceptor(Interceptor filter)
{ {
_interceptor=filter; _interceptor = filter;
} }
public boolean isWritten() public boolean isWritten()
@ -202,7 +198,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private boolean isLastContentToWrite(int len) private boolean isLastContentToWrite(int len)
{ {
_written+=len; _written += len;
return _channel.getResponse().isAllContentWritten(_written); return _channel.getResponse().isAllContentWritten(_written);
} }
@ -213,12 +209,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
protected Blocker acquireWriteBlockingCallback() throws IOException protected Blocker acquireWriteBlockingCallback() throws IOException
{ {
return _writeBlock.acquire(); return _writeBlocker.acquire();
} }
private void write(ByteBuffer content, boolean complete) throws IOException private void write(ByteBuffer content, boolean complete) throws IOException
{ {
try (Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
write(content, complete, blocker); write(content, complete, blocker);
blocker.block(); blocker.block();
@ -248,9 +244,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void close() public void close()
{ {
while(true) while (true)
{ {
OutputState state=_state.get(); OutputState state = _state.get();
switch (state) switch (state)
{ {
case CLOSED: case CLOSED:
@ -259,18 +255,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
case UNREADY: case UNREADY:
{ {
if (_state.compareAndSet(state,OutputState.ERROR)) if (_state.compareAndSet(state, OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async close"):_onError); _writeListener.onError(_onError == null ? new EofException("Async close") : _onError);
break; break;
} }
default: default:
{ {
if (!_state.compareAndSet(state,OutputState.CLOSED)) if (!_state.compareAndSet(state, OutputState.CLOSED))
break; break;
try try
{ {
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
} }
catch (IOException x) catch (IOException x)
{ {
@ -293,9 +289,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
void closed() void closed()
{ {
while(true) while (true)
{ {
OutputState state=_state.get(); OutputState state = _state.get();
switch (state) switch (state)
{ {
case CLOSED: case CLOSED:
@ -304,8 +300,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
case UNREADY: case UNREADY:
{ {
if (_state.compareAndSet(state,OutputState.ERROR)) if (_state.compareAndSet(state, OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async closed"):_onError); _writeListener.onError(_onError == null ? new EofException("Async closed") : _onError);
break; break;
} }
default: default:
@ -345,18 +341,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public boolean isClosed() public boolean isClosed()
{ {
return _state.get()==OutputState.CLOSED; return _state.get() == OutputState.CLOSED;
} }
@Override @Override
public void flush() throws IOException public void flush() throws IOException
{ {
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false); write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false);
return; return;
case ASYNC: case ASYNC:
@ -388,9 +384,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void write(byte[] b, int off, int len) throws IOException public void write(byte[] b, int off, int len) throws IOException
{ {
// Async or Blocking ? // Async or Blocking ?
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
// process blocking below // process blocking below
@ -405,7 +401,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Should we aggregate? // Should we aggregate?
boolean last = isLastContentToWrite(len); boolean last = isLastContentToWrite(len);
if (!last && len<=_commitSize) if (!last && len <= _commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
@ -414,7 +410,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
int filled = BufferUtil.fill(_aggregate, b, off, len); int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content // return if we are not complete, not full and filled all the content
if (filled==len && !BufferUtil.isFull(_aggregate)) if (filled == len && !BufferUtil.isFull(_aggregate))
{ {
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException(); throw new IllegalStateException();
@ -422,12 +418,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
// adjust offset/length // adjust offset/length
off+=filled; off += filled;
len-=filled; len -= filled;
} }
// Do the asynchronous writing from the callback // Do the asynchronous writing from the callback
new AsyncWrite(b,off,len,last).iterate(); new AsyncWrite(b, off, len, last).iterate();
return; return;
case PENDING: case PENDING:
@ -451,7 +447,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Should we aggregate? // Should we aggregate?
int capacity = getBufferSize(); int capacity = getBufferSize();
boolean last = isLastContentToWrite(len); boolean last = isLastContentToWrite(len);
if (!last && len<=_commitSize) if (!last && len <= _commitSize)
{ {
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers()); _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
@ -460,21 +456,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
int filled = BufferUtil.fill(_aggregate, b, off, len); int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content // return if we are not complete, not full and filled all the content
if (filled==len && !BufferUtil.isFull(_aggregate)) if (filled == len && !BufferUtil.isFull(_aggregate))
return; return;
// adjust offset/length // adjust offset/length
off+=filled; off += filled;
len-=filled; len -= filled;
} }
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
write(_aggregate, last && len==0); write(_aggregate, last && len == 0);
// should we fill aggregate again from the buffer? // should we fill aggregate again from the buffer?
if (len>0 && !last && len<=_commitSize && len<=BufferUtil.space(_aggregate)) if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate))
{ {
BufferUtil.append(_aggregate, b, off, len); BufferUtil.append(_aggregate, b, off, len);
return; return;
@ -482,26 +478,26 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
// write any remaining content in the buffer directly // write any remaining content in the buffer directly
if (len>0) if (len > 0)
{ {
// write a buffer capacity at a time to avoid JVM pooling large direct buffers // write a buffer capacity at a time to avoid JVM pooling large direct buffers
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
ByteBuffer view = ByteBuffer.wrap(b, off, len); ByteBuffer view = ByteBuffer.wrap(b, off, len);
while (len>getBufferSize()) while (len > getBufferSize())
{ {
int p=view.position(); int p = view.position();
int l=p+getBufferSize(); int l = p + getBufferSize();
view.limit(p+getBufferSize()); view.limit(p + getBufferSize());
write(view,false); write(view, false);
len-=getBufferSize(); len -= getBufferSize();
view.limit(l+Math.min(len,getBufferSize())); view.limit(l + Math.min(len, getBufferSize()));
view.position(l); view.position(l);
} }
write(view,last); write(view, last);
} }
else if (last) else if (last)
{ {
write(BufferUtil.EMPTY_BUFFER,true); write(BufferUtil.EMPTY_BUFFER, true);
} }
if (last) if (last)
@ -511,9 +507,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void write(ByteBuffer buffer) throws IOException public void write(ByteBuffer buffer) throws IOException
{ {
// Async or Blocking ? // Async or Blocking ?
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
// process blocking below // process blocking below
@ -528,7 +524,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Do the asynchronous writing from the callback // Do the asynchronous writing from the callback
boolean last = isLastContentToWrite(buffer.remaining()); boolean last = isLastContentToWrite(buffer.remaining());
new AsyncWrite(buffer,last).iterate(); new AsyncWrite(buffer, last).iterate();
return; return;
case PENDING: case PENDING:
@ -547,17 +543,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break; break;
} }
// handle blocking write // handle blocking write
int len=BufferUtil.length(buffer); int len = BufferUtil.length(buffer);
boolean last = isLastContentToWrite(len); boolean last = isLastContentToWrite(len);
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
write(_aggregate, last && len==0); write(_aggregate, last && len == 0);
// write any remaining content in the buffer directly // write any remaining content in the buffer directly
if (len>0) if (len > 0)
write(buffer, last); write(buffer, last);
else if (last) else if (last)
write(BufferUtil.EMPTY_BUFFER, true); write(BufferUtil.EMPTY_BUFFER, true);
@ -569,13 +564,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void write(int b) throws IOException public void write(int b) throws IOException
{ {
_written+=1; _written += 1;
boolean complete=_channel.getResponse().isAllContentWritten(_written); boolean complete = _channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ? // Async or Blocking ?
while(true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
if (_aggregate == null) if (_aggregate == null)
@ -649,7 +644,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void sendContent(ByteBuffer content) throws IOException public void sendContent(ByteBuffer content) throws IOException
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent({})",BufferUtil.toDetailString(content)); LOG.debug("sendContent({})", BufferUtil.toDetailString(content));
write(content, true); write(content, true);
closed(); closed();
@ -663,7 +658,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(InputStream in) throws IOException public void sendContent(InputStream in) throws IOException
{ {
try(Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
new InputStreamWritingCB(in, blocker).iterate(); new InputStreamWritingCB(in, blocker).iterate();
blocker.block(); blocker.block();
@ -685,7 +680,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(ReadableByteChannel in) throws IOException public void sendContent(ReadableByteChannel in) throws IOException
{ {
try(Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
new ReadableByteChannelWritingCB(in, blocker).iterate(); new ReadableByteChannelWritingCB(in, blocker).iterate();
blocker.block(); blocker.block();
@ -707,7 +702,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/ */
public void sendContent(HttpContent content) throws IOException public void sendContent(HttpContent content) throws IOException
{ {
try(Blocker blocker = _writeBlock.acquire()) try (Blocker blocker = _writeBlocker.acquire())
{ {
sendContent(content, blocker); sendContent(content, blocker);
blocker.block(); blocker.block();
@ -723,13 +718,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
/** /**
* Asynchronous send of whole content. * Asynchronous send of whole content.
* @param content The whole content to send *
* @param content The whole content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(ByteBuffer content, final Callback callback) public void sendContent(ByteBuffer content, final Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback); LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback);
write(content, true, new Callback.Nested(callback) write(content, true, new Callback.Nested(callback)
{ {
@ -753,13 +749,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
* Asynchronous send of stream content. * Asynchronous send of stream content.
* The stream will be closed after reading all content. * The stream will be closed after reading all content.
* *
* @param in The stream content to send * @param in The stream content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(InputStream in, Callback callback) public void sendContent(InputStream in, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(stream={},{})",in,callback); LOG.debug("sendContent(stream={},{})", in, callback);
new InputStreamWritingCB(in, callback).iterate(); new InputStreamWritingCB(in, callback).iterate();
} }
@ -768,13 +764,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
* Asynchronous send of channel content. * Asynchronous send of channel content.
* The channel will be closed after reading all content. * The channel will be closed after reading all content.
* *
* @param in The channel content to send * @param in The channel content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(ReadableByteChannel in, Callback callback) public void sendContent(ReadableByteChannel in, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(channel={},{})",in,callback); LOG.debug("sendContent(channel={},{})", in, callback);
new ReadableByteChannelWritingCB(in, callback).iterate(); new ReadableByteChannelWritingCB(in, callback).iterate();
} }
@ -783,12 +779,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
* Asynchronous send of HTTP content. * Asynchronous send of HTTP content.
* *
* @param httpContent The HTTP content to send * @param httpContent The HTTP content to send
* @param callback The callback to use to notify success or failure * @param callback The callback to use to notify success or failure
*/ */
public void sendContent(HttpContent httpContent, Callback callback) public void sendContent(HttpContent httpContent, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendContent(http={},{})",httpContent,callback); LOG.debug("sendContent(http={},{})", httpContent, callback);
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
@ -803,7 +799,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
while (true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING)) if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
@ -824,37 +820,36 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break; break;
} }
ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
if (buffer == null) if (buffer == null)
buffer = httpContent.getIndirectBuffer(); buffer = httpContent.getIndirectBuffer();
if (buffer!=null) if (buffer != null)
{ {
sendContent(buffer,callback); sendContent(buffer, callback);
return; return;
} }
try try
{ {
ReadableByteChannel rbc=httpContent.getReadableByteChannel(); ReadableByteChannel rbc = httpContent.getReadableByteChannel();
if (rbc!=null) if (rbc != null)
{ {
// Close of the rbc is done by the async sendContent // Close of the rbc is done by the async sendContent
sendContent(rbc,callback); sendContent(rbc, callback);
return; return;
} }
InputStream in = httpContent.getInputStream(); InputStream in = httpContent.getInputStream();
if (in!=null) if (in != null)
{ {
sendContent(in,callback); sendContent(in, callback);
return; return;
} }
throw new IllegalArgumentException("unknown content for "+httpContent); throw new IllegalArgumentException("unknown content for " + httpContent);
} }
catch(Throwable th) catch (Throwable th)
{ {
abort(th); abort(th);
callback.failed(th); callback.failed(th);
@ -874,7 +869,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public void recycle() public void recycle()
{ {
_interceptor=_channel; _interceptor = _channel;
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
BufferUtil.clear(_aggregate); BufferUtil.clear(_aggregate);
_written = 0; _written = 0;
@ -914,7 +909,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
while (true) while (true)
{ {
switch(_state.get()) switch (_state.get())
{ {
case OPEN: case OPEN:
return true; return true;
@ -950,28 +945,29 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void run() public void run()
{ {
loop: while (true) loop:
while (true)
{ {
OutputState state = _state.get(); OutputState state = _state.get();
if(_onError!=null) if (_onError != null)
{ {
switch(state) switch (state)
{ {
case CLOSED: case CLOSED:
case ERROR: case ERROR:
{ {
_onError=null; _onError = null;
break loop; break loop;
} }
default: default:
{ {
if (_state.compareAndSet(state, OutputState.ERROR)) if (_state.compareAndSet(state, OutputState.ERROR))
{ {
Throwable th=_onError; Throwable th = _onError;
_onError=null; _onError = null;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onError",th); LOG.debug("onError", th);
_writeListener.onError(th); _writeListener.onError(th);
close(); close();
break loop; break loop;
@ -981,7 +977,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
continue; continue;
} }
switch(_state.get()) switch (_state.get())
{ {
case ASYNC: case ASYNC:
case READY: case READY:
@ -1003,7 +999,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break; break;
default: default:
_onError=new IllegalStateException("state="+_state.get()); _onError = new IllegalStateException("state=" + _state.get());
} }
} }
} }
@ -1023,25 +1019,25 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get()); return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get());
} }
private abstract class AsyncICB extends IteratingCallback private abstract class AsyncICB extends IteratingCallback
{ {
final boolean _last; final boolean _last;
AsyncICB(boolean last) AsyncICB(boolean last)
{ {
_last=last; _last = last;
} }
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {
while(true) while (true)
{ {
OutputState last=_state.get(); OutputState last = _state.get();
switch(last) switch (last)
{ {
case PENDING: case PENDING:
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
@ -1070,7 +1066,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override @Override
public void onCompleteFailure(Throwable e) public void onCompleteFailure(Throwable e)
{ {
_onError=e==null?new IOException():e; _onError = e == null ? new IOException() : e;
if (_channel.getState().onWritePossible()) if (_channel.getState().onWritePossible())
_channel.execute(_channel); _channel.execute(_channel);
} }
@ -1090,15 +1086,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_flushed=true; _flushed = true;
write(_aggregate, false, this); write(_aggregate, false, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
if (!_flushed) if (!_flushed)
{ {
_flushed=true; _flushed = true;
write(BufferUtil.EMPTY_BUFFER,false,this); write(BufferUtil.EMPTY_BUFFER, false, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1116,23 +1112,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public AsyncWrite(byte[] b, int off, int len, boolean last) public AsyncWrite(byte[] b, int off, int len, boolean last)
{ {
super(last); super(last);
_buffer=ByteBuffer.wrap(b, off, len); _buffer = ByteBuffer.wrap(b, off, len);
_len=len; _len = len;
// always use a view for large byte arrays to avoid JVM pooling large direct buffers // always use a view for large byte arrays to avoid JVM pooling large direct buffers
_slice=_len<getBufferSize()?null:_buffer.duplicate(); _slice = _len < getBufferSize() ? null : _buffer.duplicate();
} }
public AsyncWrite(ByteBuffer buffer, boolean last) public AsyncWrite(ByteBuffer buffer, boolean last)
{ {
super(last); super(last);
_buffer=buffer; _buffer = buffer;
_len=buffer.remaining(); _len = buffer.remaining();
// Use a slice buffer for large indirect to avoid JVM pooling large direct buffers // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
if (_buffer.isDirect()||_len<getBufferSize()) if (_buffer.isDirect() || _len < getBufferSize())
_slice=null; _slice = null;
else else
{ {
_slice=_buffer.duplicate(); _slice = _buffer.duplicate();
} }
} }
@ -1142,16 +1138,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_completed=_len==0; _completed = _len == 0;
write(_aggregate, _last && _completed, this); write(_aggregate, _last && _completed, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
// Can we just aggregate the remainder? // Can we just aggregate the remainder?
if (!_last && _len<BufferUtil.space(_aggregate) && _len<_commitSize) if (!_last && _len < BufferUtil.space(_aggregate) && _len < _commitSize)
{ {
int position = BufferUtil.flipToFill(_aggregate); int position = BufferUtil.flipToFill(_aggregate);
BufferUtil.put(_buffer,_aggregate); BufferUtil.put(_buffer, _aggregate);
BufferUtil.flipToFlush(_aggregate, position); BufferUtil.flipToFlush(_aggregate, position);
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -1160,21 +1156,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_buffer.hasRemaining()) if (_buffer.hasRemaining())
{ {
// if there is no slice, just write it // if there is no slice, just write it
if (_slice==null) if (_slice == null)
{ {
_completed=true; _completed = true;
write(_buffer, _last, this); write(_buffer, _last, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
// otherwise take a slice // otherwise take a slice
int p=_buffer.position(); int p = _buffer.position();
int l=Math.min(getBufferSize(),_buffer.remaining()); int l = Math.min(getBufferSize(), _buffer.remaining());
int pl=p+l; int pl = p + l;
_slice.limit(pl); _slice.limit(pl);
_buffer.position(pl); _buffer.position(pl);
_slice.position(p); _slice.position(p);
_completed=!_buffer.hasRemaining(); _completed = !_buffer.hasRemaining();
write(_slice, _last && _completed, this); write(_slice, _last && _completed, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1183,13 +1179,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// need to do so // need to do so
if (_last && !_completed) if (_last && !_completed)
{ {
_completed=true; _completed = true;
write(BufferUtil.EMPTY_BUFFER, true, this); write(BufferUtil.EMPTY_BUFFER, true, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
if (LOG.isDebugEnabled() && _completed) if (LOG.isDebugEnabled() && _completed)
LOG.debug("EOF of {}",this); LOG.debug("EOF of {}", this);
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
} }
@ -1211,7 +1207,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public InputStreamWritingCB(InputStream in, Callback callback) public InputStreamWritingCB(InputStream in, Callback callback)
{ {
super(callback); super(callback);
_in=in; _in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false); _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
} }
@ -1223,7 +1219,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_eof) if (_eof)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("EOF of {}",this); LOG.debug("EOF of {}", this);
// Handle EOF // Handle EOF
_in.close(); _in.close();
closed(); closed();
@ -1232,20 +1228,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
// Read until buffer full or EOF // Read until buffer full or EOF
int len=0; int len = 0;
while (len<_buffer.capacity() && !_eof) while (len < _buffer.capacity() && !_eof)
{ {
int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len); int r = _in.read(_buffer.array(), _buffer.arrayOffset() + len, _buffer.capacity() - len);
if (r<0) if (r < 0)
_eof=true; _eof = true;
else else
len+=r; len += r;
} }
// write what we have // write what we have
_buffer.position(0); _buffer.position(0);
_buffer.limit(len); _buffer.limit(len);
write(_buffer,_eof,this); write(_buffer, _eof, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1259,8 +1255,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
} }
/* ------------------------------------------------------------ */ /**
/** An iterating callback that will take content from a * An iterating callback that will take content from a
* ReadableByteChannel and write it to the {@link HttpChannel}. * ReadableByteChannel and write it to the {@link HttpChannel}.
* A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
* {@link HttpChannel#useDirectBuffers()} is true. * {@link HttpChannel#useDirectBuffers()} is true.
@ -1277,7 +1273,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{ {
super(callback); super(callback);
_in=in; _in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers()); _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
} }
@ -1289,7 +1285,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_eof) if (_eof)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("EOF of {}",this); LOG.debug("EOF of {}", this);
_in.close(); _in.close();
closed(); closed();
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
@ -1299,11 +1295,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Read from stream until buffer full or EOF // Read from stream until buffer full or EOF
BufferUtil.clearToFill(_buffer); BufferUtil.clearToFill(_buffer);
while (_buffer.hasRemaining() && !_eof) while (_buffer.hasRemaining() && !_eof)
_eof = (_in.read(_buffer)) < 0; _eof = (_in.read(_buffer)) < 0;
// write what we have // write what we have
BufferUtil.flipToFlush(_buffer, 0); BufferUtil.flipToFlush(_buffer, 0);
write(_buffer,_eof,this); write(_buffer, _eof, this);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -1317,4 +1313,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
super.onCompleteFailure(x); super.onCompleteFailure(x);
} }
} }
private static class WriteBlocker extends SharedBlockingCallback
{
private final HttpChannel _channel;
private WriteBlocker(HttpChannel channel)
{
_channel = channel;
}
@Override
protected long getIdleTimeout()
{
long blockingTimeout = _channel.getHttpConfiguration().getBlockingTimeout();
if (blockingTimeout == 0)
return _channel.getIdleTimeout();
return blockingTimeout;
}
}
} }

View File

@ -546,7 +546,7 @@ public class ResourceHandler extends HandlerWrapper implements ResourceFactory
}; };
// Can we use a memory mapped file? // Can we use a memory mapped file?
if (_minMemoryMappedContentLength>0 && if (_minMemoryMappedContentLength>=0 &&
resource.length()>_minMemoryMappedContentLength && resource.length()>_minMemoryMappedContentLength &&
resource.length()<Integer.MAX_VALUE && resource.length()<Integer.MAX_VALUE &&
resource instanceof PathResource) resource instanceof PathResource)

View File

@ -73,7 +73,12 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
{ {
super.before(); super.before();
if (_httpConfiguration!=null) if (_httpConfiguration!=null)
{
_httpConfiguration.setBlockingTimeout(-1L); _httpConfiguration.setBlockingTimeout(-1L);
_httpConfiguration.setMinRequestDataRate(-1);
_httpConfiguration.setIdleTimeout(-1);
}
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -732,41 +737,6 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
int offset=in.indexOf("Hello World"); int offset=in.indexOf("Hello World");
Assert.assertTrue(offset > 0); Assert.assertTrue(offset > 0);
} }
@Test(timeout=60000)
public void testMaxIdleWithDelayedDispatch() throws Exception
{
configureServer(new EchoHandler());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
String content="Wibble";
byte[] contentB=content.getBytes("utf-8");
os.write((
"POST /echo HTTP/1.1\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: "+contentB.length+"\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
long start = System.currentTimeMillis();
IO.toString(is);
Thread.sleep(sleepTime);
Assert.assertEquals(-1, is.read());
Assert.assertTrue(System.currentTimeMillis() - start > minimumTestRuntime);
Assert.assertTrue(System.currentTimeMillis() - start < maximumTestRuntime);
}
protected static class SlowResponseHandler extends AbstractHandler protected static class SlowResponseHandler extends AbstractHandler
{ {

View File

@ -18,27 +18,37 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Locale; import java.util.Locale;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.server.session.SessionHandler;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertTrue;
public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest
{ {
@Before @Before
public void init() throws Exception public void init() throws Exception
{ {
ServerConnector connector = new ServerConnector(_server,1,1); ServerConnector connector = new ServerConnector(_server,1,1);
connector.setIdleTimeout(MAX_IDLE_TIME); // 250 msec max idle connector.setIdleTimeout(MAX_IDLE_TIME);
startServer(connector); startServer(connector);
} }
@ -113,4 +123,49 @@ public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest
return response; return response;
} }
} }
@Test
public void testHttpWriteIdleTimeout() throws Exception
{
_httpConfiguration.setBlockingTimeout(500);
configureServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
Assert.assertFalse(client.isClosed());
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
try (StacklessLogging scope = new StacklessLogging(HttpChannel.class))
{
os.write((
"POST /echo HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-type: text/plain; charset=utf-8\r\n"+
"content-length: 20\r\n"+
"\r\n").getBytes("utf-8"));
os.flush();
os.write("123456789\n".getBytes("utf-8"));
os.flush();
Thread.sleep(1000);
os.write("=========\n".getBytes("utf-8"));
os.flush();
Thread.sleep(2000);
String response =IO.toString(is);
Assert.assertThat(response,containsString(" 500 "));
Assert.assertThat(response, Matchers.not(containsString("=========")));
}
}
} }

View File

@ -1,3 +1,3 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=INFO #org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.server.LEVEL=DEBUG #org.eclipse.jetty.server.LEVEL=DEBUG

View File

@ -59,16 +59,16 @@ public class SharedBlockingCallback
private final Condition _idle = _lock.newCondition(); private final Condition _idle = _lock.newCondition();
private final Condition _complete = _lock.newCondition(); private final Condition _complete = _lock.newCondition();
private Blocker _blocker = new Blocker(); private Blocker _blocker = new Blocker();
protected long getIdleTimeout() protected long getIdleTimeout()
{ {
return -1; return -1;
} }
public Blocker acquire() throws IOException public Blocker acquire() throws IOException
{ {
_lock.lock();
long idle = getIdleTimeout(); long idle = getIdleTimeout();
_lock.lock();
try try
{ {
while (_blocker._state != IDLE) while (_blocker._state != IDLE)
@ -83,8 +83,9 @@ public class SharedBlockingCallback
_idle.await(); _idle.await();
} }
_blocker._state = null; _blocker._state = null;
return _blocker;
} }
catch (final InterruptedException e) catch (InterruptedException x)
{ {
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
@ -92,7 +93,6 @@ public class SharedBlockingCallback
{ {
_lock.unlock(); _lock.unlock();
} }
return _blocker;
} }
protected void notComplete(Blocker blocker) protected void notComplete(Blocker blocker)
@ -154,8 +154,15 @@ public class SharedBlockingCallback
_state=cause; _state=cause;
_complete.signalAll(); _complete.signalAll();
} }
else else if (_state instanceof BlockerTimeoutException)
{
// Failure arrived late, block() already
// modified the state, nothing more to do.
}
else
{
throw new IllegalStateException(_state); throw new IllegalStateException(_state);
}
} }
finally finally
{ {
@ -172,19 +179,24 @@ public class SharedBlockingCallback
*/ */
public void block() throws IOException public void block() throws IOException
{ {
_lock.lock();
long idle = getIdleTimeout(); long idle = getIdleTimeout();
_lock.lock();
try try
{ {
while (_state == null) while (_state == null)
{ {
if (idle>0 && (idle < Long.MAX_VALUE/2)) if (idle > 0)
{ {
// Wait a little bit longer than expected callback idle timeout // Waiting here may compete with the idle timeout mechanism,
if (!_complete.await(idle+idle/2,TimeUnit.MILLISECONDS)) // so here we wait a little bit longer to favor the normal
// The callback has not arrived in sufficient time. // idle timeout mechanism that will call failed(Throwable).
// We will synthesize a TimeoutException long excess = Math.min(idle / 2, 1000);
_state=new BlockerTimeoutException(); if (!_complete.await(idle + excess, TimeUnit.MILLISECONDS))
{
// Method failed(Throwable) has not been called yet,
// so we will synthesize a special TimeoutException.
_state = new BlockerTimeoutException();
}
} }
else else
{ {

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Assume;
import org.junit.Rule; import org.junit.Rule;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -61,6 +62,7 @@ public abstract class AbstractTest
@Rule @Rule
public final TestTracker tracker = new TestTracker(); public final TestTracker tracker = new TestTracker();
protected final HttpConfiguration httpConfig = new HttpConfiguration();
protected final Transport transport; protected final Transport transport;
protected SslContextFactory sslContextFactory; protected SslContextFactory sslContextFactory;
protected Server server; protected Server server;
@ -69,6 +71,7 @@ public abstract class AbstractTest
public AbstractTest(Transport transport) public AbstractTest(Transport transport)
{ {
Assume.assumeNotNull(transport);
this.transport = transport; this.transport = transport;
} }
@ -118,14 +121,13 @@ public abstract class AbstractTest
{ {
case HTTP: case HTTP:
{ {
result.add(new HttpConnectionFactory(new HttpConfiguration())); result.add(new HttpConnectionFactory(httpConfig));
break; break;
} }
case HTTPS: case HTTPS:
{ {
HttpConfiguration configuration = new HttpConfiguration(); httpConfig.addCustomizer(new SecureRequestCustomizer());
configuration.addCustomizer(new SecureRequestCustomizer()); HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
HttpConnectionFactory http = new HttpConnectionFactory(configuration);
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, http.getProtocol()); SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, http.getProtocol());
result.add(ssl); result.add(ssl);
result.add(http); result.add(http);
@ -133,14 +135,13 @@ public abstract class AbstractTest
} }
case H2C: case H2C:
{ {
result.add(new HTTP2CServerConnectionFactory(new HttpConfiguration())); result.add(new HTTP2CServerConnectionFactory(httpConfig));
break; break;
} }
case H2: case H2:
{ {
HttpConfiguration configuration = new HttpConfiguration(); httpConfig.addCustomizer(new SecureRequestCustomizer());
configuration.addCustomizer(new SecureRequestCustomizer()); HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(httpConfig);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(configuration);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory("h2"); ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory("h2");
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol()); SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol());
result.add(ssl); result.add(ssl);
@ -150,7 +151,7 @@ public abstract class AbstractTest
} }
case FCGI: case FCGI:
{ {
result.add(new ServerFCGIConnectionFactory(new HttpConfiguration())); result.add(new ServerFCGIConnectionFactory(httpConfig));
break; break;
} }
default: default:

View File

@ -0,0 +1,732 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.junit.Assert;
import org.junit.Test;
public class ServerTimeoutsTest extends AbstractTest
{
public ServerTimeoutsTest(Transport transport)
{
// Skip FCGI for now, not much interested in its server-side behavior.
super(transport == Transport.FCGI ? null : transport);
}
private void setServerIdleTimeout(long idleTimeout)
{
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
h2.setStreamIdleTimeout(idleTimeout);
else
connector.setIdleTimeout(idleTimeout);
}
@Test
public void testDelayedDispatchRequestWithDelayedFirstContentIdleTimeoutFires() throws Exception
{
httpConfig.setDelayDispatchUntilContent(true);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
handlerLatch.countDown();
}
});
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(new DeferredContentProvider())
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// We did not send the content, the request was not
// dispatched, the server should have idle timed out.
Assert.assertFalse(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testNoBlockingTimeoutBlockingReadIdleTimeoutFires() throws Exception
{
httpConfig.setBlockingTimeout(-1);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutSmallerThanIdleTimeoutBlockingReadBlockingTimeoutFires() throws Exception
{
long blockingTimeout = 1000;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
long idleTimeout = 3 * blockingTimeout;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * blockingTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutLargerThanIdleTimeoutBlockingReadIdleTimeoutFires() throws Exception
{
long idleTimeout = 1000;
long blockingTimeout = 3 * idleTimeout;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testNoBlockingTimeoutBlockingWriteIdleTimeoutFires() throws Exception
{
httpConfig.setBlockingTimeout(-1);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingWriteHandler(handlerLatch));
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Blocking write should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutSmallerThanIdleTimeoutBlockingWriteBlockingTimeoutFires() throws Exception
{
long blockingTimeout = 1000;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingWriteHandler(handlerLatch));
long idleTimeout = 3 * blockingTimeout;
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Blocking write should timeout.
Assert.assertTrue(handlerLatch.await(2 * blockingTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutLargerThanIdleTimeoutBlockingWriteIdleTimeoutFires() throws Exception
{
long idleTimeout = 1000;
long blockingTimeout = 3 * idleTimeout;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingWriteHandler(handlerLatch));
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBlockingTimeoutWithSlowRead() throws Exception
{
long idleTimeout = 1000;
long blockingTimeout = 2 * idleTimeout;
httpConfig.setBlockingTimeout(blockingTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
catch (IOException x)
{
handlerLatch.countDown();
throw x;
}
}
});
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.content(contentProvider)
.send(result ->
{
// Result may fail to send the whole request body,
// but the response has arrived successfully.
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// The writes should be slow but not trigger the idle timeout.
long period = idleTimeout / 2;
long writes = 2 * (blockingTimeout / period);
for (long i = 0; i < writes; ++i)
{
contentProvider.offer(ByteBuffer.allocate(1));
Thread.sleep(period);
}
contentProvider.close();
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testAsyncReadIdleTimeoutFires() throws Exception
{
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
Assert.assertEquals(0, input.read());
Assert.assertFalse(input.isReady());
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable failure)
{
if (failure instanceof TimeoutException)
{
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
asyncContext.complete();
handlerLatch.countDown();
}
}
});
}
});
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Async read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testAsyncWriteIdleTimeoutFires() throws Exception
{
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletOutputStream output = response.getOutputStream();
output.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
output.write(new byte[64 * 1024 * 1024]);
}
@Override
public void onError(Throwable failure)
{
if (failure instanceof TimeoutException)
{
asyncContext.complete();
handlerLatch.countDown();
}
}
});
}
});
long idleTimeout = 1000;
setServerIdleTimeout(idleTimeout);
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
// Do not succeed the callback so the server will block writing.
callbacks.offer(callback);
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
// Async write should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// After the server stopped sending, consume on the client to read the early EOF.
while (true)
{
Callback callback = callbacks.poll(1, TimeUnit.SECONDS);
if (callback == null)
break;
callback.succeeded();
}
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingReadWithMinimumDataRateBelowLimit() throws Exception
{
int bytesPerSecond = 20;
httpConfig.setMinRequestDataRate(bytesPerSecond);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
catch (BadMessageException x)
{
handlerLatch.countDown();
throw x;
}
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.REQUEST_TIMEOUT_408)
resultLatch.countDown();
});
for (int i = 0; i < 3; ++i)
{
contentProvider.offer(ByteBuffer.allocate(bytesPerSecond / 2));
Thread.sleep(1000);
}
contentProvider.close();
// Request should timeout.
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingReadWithMinimumDataRateAboveLimit() throws Exception
{
int bytesPerSecond = 20;
httpConfig.setMinRequestDataRate(bytesPerSecond);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
handlerLatch.countDown();
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
resultLatch.countDown();
});
for (int i = 0; i < 3; ++i)
{
contentProvider.offer(ByteBuffer.allocate(bytesPerSecond * 2));
Thread.sleep(1000);
}
contentProvider.close();
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingReadHttpIdleTimeoutOverridesIdleTimeout() throws Exception
{
long httpIdleTimeout = 1000;
long idleTimeout = 3 * httpIdleTimeout;
httpConfig.setIdleTimeout(httpIdleTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new BlockingReadHandler(handlerLatch));
setServerIdleTimeout(idleTimeout);
try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class))
{
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Blocking read should timeout.
Assert.assertTrue(handlerLatch.await(2 * httpIdleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testAsyncReadHttpIdleTimeoutOverridesIdleTimeout() throws Exception
{
long httpIdleTimeout = 1000;
long idleTimeout = 3 * httpIdleTimeout;
httpConfig.setIdleTimeout(httpIdleTimeout);
CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
Assert.assertEquals(0, input.read());
Assert.assertFalse(input.isReady());
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable failure)
{
if (failure instanceof TimeoutException)
{
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
asyncContext.complete();
handlerLatch.countDown();
}
}
});
}
});
setServerIdleTimeout(idleTimeout);
DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1));
CountDownLatch resultLatch = new CountDownLatch(1);
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500)
resultLatch.countDown();
});
// Async read should timeout.
Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Complete the request.
contentProvider.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
private static class BlockingReadHandler extends AbstractHandler
{
private final CountDownLatch handlerLatch;
public BlockingReadHandler(CountDownLatch handlerLatch)
{
this.handlerLatch = handlerLatch;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
Assert.assertEquals(0, input.read());
try
{
input.read();
}
catch (IOException x)
{
handlerLatch.countDown();
throw x;
}
}
}
private static class BlockingWriteHandler extends AbstractHandler
{
private final CountDownLatch handlerLatch;
private BlockingWriteHandler(CountDownLatch handlerLatch)
{
this.handlerLatch = handlerLatch;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletOutputStream output = response.getOutputStream();
try
{
output.write(new byte[64 * 1024 * 1024]);
}
catch (IOException x)
{
handlerLatch.countDown();
throw x;
}
}
}
}