Issue #1973 - Implement minimum response data rate (#2012)

* Code cleanups.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Improved test case handler.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Improved exception message.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Issue #1973 - Implement minimum response data rate.

Implemented response content data rate control in HttpOutput.

Introduced a WriteFlusher.Listener interface that produces events
for every flush(). These events are forwarded to the Connection
and from there to the HttpOutput so that the data rate control can
be enforced.

Both HTTP/1.1 and HTTP/2 are implemented.
Data rate control for HTTP/1.1 is approximate because it will count
also headers bytes and the chunk bytes, while for HTTP/2 is precise.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Issue #1973 - Implement minimum response data rate.

Addressed review comments.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2017-12-27 06:07:32 -08:00 committed by Greg Wilkins
parent 696b60a541
commit e86e8a752c
13 changed files with 385 additions and 117 deletions

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
@ -37,7 +38,7 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection
public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
{
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
@ -176,6 +177,13 @@ public class HTTP2Connection extends AbstractConnection
}
}
@Override
public void onFlushed(long bytes) throws IOException
{
// TODO: add method to ISession ?
((HTTP2Session)session).onFlushed(bytes);
}
protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillableCallback = new FillableCallback();

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -175,7 +176,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
{
if (entry.generate(lease))
{
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
entries.offer(entry);
}
else
@ -207,6 +208,31 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
return Action.SCHEDULED;
}
void onFlushed(long bytes) throws IOException
{
// For the given flushed bytes, we want to only
// forward those that belong to data frame content.
for (Entry entry : actives)
{
int frameBytesLeft = entry.getFrameBytesRemaining();
if (frameBytesLeft > 0)
{
int update = (int)Math.min(bytes, frameBytesLeft);
entry.onFrameBytesFlushed(update);
bytes -= update;
IStream stream = entry.stream;
if (stream != null && !entry.isControl())
{
Object channel = stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel instanceof WriteFlusher.Listener)
((WriteFlusher.Listener)channel).onFlushed(update - Frame.HEADER_LENGTH);
}
if (bytes == 0)
break;
}
}
}
@Override
public void succeeded()
{
@ -234,13 +260,13 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
for (int i = index; i < actives.size(); ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
append(entry);
}
for (int i = 0; i < index; ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
if (entry.getDataBytesRemaining() > 0)
append(entry);
}
stalled = null;
@ -333,7 +359,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
this.stream = stream;
}
public int dataRemaining()
public abstract int getFrameBytesRemaining();
public abstract void onFrameBytesFlushed(int bytesFlushed);
public int getDataBytesRemaining()
{
return 0;
}
@ -387,6 +417,17 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
}
}
private boolean isControl()
{
switch (frame.getType())
{
case DATA:
return false;
default:
return true;
}
}
@Override
public String toString()
{

View File

@ -955,6 +955,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
}
void onFlushed(long bytes) throws IOException
{
flusher.onFlushed(bytes);
}
public void disconnect()
{
if (LOG.isDebugEnabled())
@ -1132,15 +1137,28 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class ControlEntry extends HTTP2Flusher.Entry
{
private int bytes;
private int frameBytes;
private ControlEntry(Frame frame, IStream stream, Callback callback)
{
super(frame, stream, callback);
}
@Override
public int getFrameBytesRemaining()
{
return frameBytes;
}
@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}
protected boolean generate(ByteBufferPool.Lease lease)
{
bytes = generator.control(lease, frame);
bytes = frameBytes = generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
prepare();
@ -1238,7 +1256,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class DataEntry extends HTTP2Flusher.Entry
{
private int bytes;
private int dataRemaining;
private int frameBytes;
private int dataBytes;
private int dataWritten;
private DataEntry(DataFrame frame, IStream stream, Callback callback)
@ -1249,35 +1268,47 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case
// we would have to count the padding only once.
dataRemaining = frame.remaining();
dataBytes = frame.remaining();
}
@Override
public int dataRemaining()
public int getFrameBytesRemaining()
{
return dataRemaining;
return frameBytes;
}
@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}
@Override
public int getDataBytesRemaining()
{
return dataBytes;
}
protected boolean generate(ByteBufferPool.Lease lease)
{
int dataRemaining = dataRemaining();
int dataBytes = getDataBytesRemaining();
int sessionSendWindow = getSendWindow();
int streamSendWindow = stream.updateSendWindow(0);
int window = Math.min(streamSendWindow, sessionSendWindow);
if (window <= 0 && dataRemaining > 0)
if (window <= 0 && dataBytes > 0)
return false;
int length = Math.min(dataRemaining, window);
int length = Math.min(dataBytes, window);
// Only one DATA frame is generated.
bytes = generator.data(lease, (DataFrame)frame, length);
bytes = frameBytes = generator.data(lease, (DataFrame)frame, length);
int written = bytes - Frame.HEADER_LENGTH;
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataRemaining);
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes);
this.dataWritten = written;
this.dataRemaining -= written;
this.dataBytes -= written;
flowControl.onDataSending(stream, written);
@ -1292,7 +1323,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame;
if (dataRemaining() == 0)
if (getDataBytesRemaining() == 0)
{
// Only now we can update the close state
// and eventually remove the stream.

View File

@ -62,6 +62,11 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
});
}
public HTTP2Client getHTTP2Client()
{
return client;
}
@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{

View File

@ -38,6 +38,7 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
@ -48,7 +49,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, WriteFlusher.Listener
{
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@ -85,6 +86,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
return getStream().getIdleTimeout();
}
@Override
public void onFlushed(long bytes) throws IOException
{
getResponse().getHttpOutput().onFlushed(bytes);
}
public Runnable onRequest(HeadersFrame frame)
{
try

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
* {@link EndPoint#flush(ByteBuffer...)} until all content is written.
@ -60,7 +59,7 @@ abstract public class WriteFlusher
// fill the state machine
__stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
__stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
__stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE));
__stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
}
@ -104,29 +103,30 @@ abstract public class WriteFlusher
/**
* Tries to update the current state to the given new state.
*
* @param previous the expected current state
* @param next the desired new state
* @param next the desired new state
* @return the previous state or null if the state transition failed
* @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
*/
private boolean updateState(State previous,State next)
private boolean updateState(State previous, State next)
{
if (!isTransitionAllowed(previous,next))
if (!isTransitionAllowed(previous, next))
throw new IllegalStateException();
boolean updated = _state.compareAndSet(previous, next);
if (DEBUG)
LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
LOG.debug("update {}:{}{}{}", this, previous, updated ? "-->" : "!->", next);
return updated;
}
private void fail(PendingState pending)
{
State current = _state.get();
if (current.getType()==StateType.FAILED)
if (current.getType() == StateType.FAILED)
{
FailedState failed=(FailedState)current;
if (updateState(failed,__IDLE))
FailedState failed = (FailedState)current;
if (updateState(failed, __IDLE))
{
pending.fail(failed.getCause());
return;
@ -138,9 +138,9 @@ abstract public class WriteFlusher
private void ignoreFail()
{
State current = _state.get();
while (current.getType()==StateType.FAILED)
while (current.getType() == StateType.FAILED)
{
if (updateState(current,__IDLE))
if (updateState(current, __IDLE))
return;
current = _state.get();
}
@ -209,10 +209,11 @@ abstract public class WriteFlusher
private static class FailedState extends State
{
private final Throwable _cause;
private FailedState(Throwable cause)
{
super(StateType.FAILED);
_cause=cause;
_cause = cause;
}
public Throwable getCause()
@ -257,7 +258,7 @@ abstract public class WriteFlusher
protected boolean fail(Throwable cause)
{
if (_callback!=null)
if (_callback != null)
{
_callback.failed(cause);
return true;
@ -267,7 +268,7 @@ abstract public class WriteFlusher
protected void complete()
{
if (_callback!=null)
if (_callback != null)
_callback.succeeded();
}
@ -286,8 +287,8 @@ abstract public class WriteFlusher
{
State s = _state.get();
return (s instanceof PendingState)
?((PendingState)s).getCallbackInvocationType()
:Invocable.InvocationType.BLOCKING;
? ((PendingState)s).getCallbackInvocationType()
: Invocable.InvocationType.BLOCKING;
}
/**
@ -300,13 +301,13 @@ abstract public class WriteFlusher
* Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
* fails it'll fail the callback.
*
* If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state
* If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state
* and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}.
*
* If all buffers have been written it calls callback.complete().
*
* @param callback the callback to call on either failed or complete
* @param buffers the buffers to flush to the endpoint
* @param buffers the buffers to flush to the endpoint
* @throws WritePendingException if unable to write due to prior pending write
*/
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
@ -314,20 +315,20 @@ abstract public class WriteFlusher
if (DEBUG)
LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
if (!updateState(__IDLE,__WRITING))
if (!updateState(__IDLE, __WRITING))
throw new WritePendingException();
try
{
buffers=flush(buffers);
buffers = flush(buffers);
// if we are incomplete?
if (buffers!=null)
if (buffers != null)
{
if (DEBUG)
LOG.debug("flushed incomplete");
PendingState pending=new PendingState(buffers, callback);
if (updateState(__WRITING,pending))
PendingState pending = new PendingState(buffers, callback);
if (updateState(__WRITING, pending))
onIncompleteFlush();
else
fail(pending);
@ -335,18 +336,18 @@ abstract public class WriteFlusher
}
// If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__WRITING,__IDLE))
if (!updateState(__WRITING, __IDLE))
ignoreFail();
if (callback!=null)
if (callback != null)
callback.succeeded();
}
catch (IOException e)
{
if (DEBUG)
LOG.debug("write exception", e);
if (updateState(__WRITING,__IDLE))
if (updateState(__WRITING, __IDLE))
{
if (callback!=null)
if (callback != null)
callback.failed(e);
}
else
@ -354,7 +355,6 @@ abstract public class WriteFlusher
}
}
/**
* Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
@ -370,27 +370,27 @@ abstract public class WriteFlusher
State previous = _state.get();
if (previous.getType()!=StateType.PENDING)
if (previous.getType() != StateType.PENDING)
return; // failure already handled.
PendingState pending = (PendingState)previous;
if (!updateState(pending,__COMPLETING))
if (!updateState(pending, __COMPLETING))
return; // failure already handled.
try
{
ByteBuffer[] buffers = pending.getBuffers();
buffers=flush(buffers);
buffers = flush(buffers);
// if we are incomplete?
if (buffers!=null)
if (buffers != null)
{
if (DEBUG)
LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
if (buffers!=pending.getBuffers())
pending=new PendingState(buffers, pending._callback);
if (updateState(__COMPLETING,pending))
LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers));
if (buffers != pending.getBuffers())
pending = new PendingState(buffers, pending._callback);
if (updateState(__COMPLETING, pending))
onIncompleteFlush();
else
fail(pending);
@ -398,7 +398,7 @@ abstract public class WriteFlusher
}
// If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__COMPLETING,__IDLE))
if (!updateState(__COMPLETING, __IDLE))
ignoreFail();
pending.complete();
}
@ -406,7 +406,7 @@ abstract public class WriteFlusher
{
if (DEBUG)
LOG.debug("completeWrite exception", e);
if(updateState(__COMPLETING,__IDLE))
if (updateState(__COMPLETING, __IDLE))
pending.fail(e);
else
fail(pending);
@ -422,59 +422,84 @@ abstract public class WriteFlusher
*/
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
boolean progress=true;
while(progress && buffers!=null)
boolean progress = true;
while (progress && buffers != null)
{
int before=buffers.length==0?0:buffers[0].remaining();
boolean flushed=_endPoint.flush(buffers);
int r=buffers.length==0?0:buffers[0].remaining();
long before = remaining(buffers);
boolean flushed = _endPoint.flush(buffers);
long after = remaining(buffers);
long written = before - after;
if (LOG.isDebugEnabled())
LOG.debug("Flushed={} {}/{}+{} {}",flushed,before-r,before,buffers.length-1,this);
LOG.debug("Flushed={} written={} remaining={} {}", flushed, written, after, this);
if (written > 0)
{
Connection connection = _endPoint.getConnection();
if (connection instanceof Listener)
((Listener)connection).onFlushed(written);
}
if (flushed)
return null;
progress=before!=r;
progress = written > 0;
int not_empty=0;
while(r==0)
int index = 0;
while (true)
{
if (++not_empty==buffers.length)
if (index == buffers.length)
{
buffers=null;
not_empty=0;
// All buffers consumed.
buffers = null;
index = 0;
break;
}
progress=true;
r=buffers[not_empty].remaining();
else
{
int remaining = buffers[index].remaining();
if (remaining > 0)
break;
++index;
progress = true;
}
}
if (not_empty>0)
buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
if (index > 0)
buffers = Arrays.copyOfRange(buffers, index, buffers.length);
}
if (LOG.isDebugEnabled())
LOG.debug("!fully flushed {}",this);
LOG.debug("!fully flushed {}", this);
// If buffers is null, then flush has returned false but has consumed all the data!
// This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS
// and that will keep this WriteFlusher pending.
return buffers==null?EMPTY_BUFFERS:buffers;
return buffers == null ? EMPTY_BUFFERS : buffers;
}
/* ------------------------------------------------------------ */
/** Notify the flusher of a failure
private long remaining(ByteBuffer[] buffers)
{
if (buffers == null)
return 0;
long result = 0;
for (ByteBuffer buffer : buffers)
result += buffer.remaining();
return result;
}
/**
* Notify the flusher of a failure
*
* @param cause The cause of the failure
* @return true if the flusher passed the failure to a {@link Callback} instance
*/
public boolean onFail(Throwable cause)
{
// Keep trying to handle the failure until we get to IDLE or FAILED state
while(true)
while (true)
{
State current=_state.get();
switch(current.getType())
State current = _state.get();
switch (current.getType())
{
case IDLE:
case FAILED:
@ -487,7 +512,7 @@ abstract public class WriteFlusher
LOG.debug("failed: " + this, cause);
PendingState pending = (PendingState)current;
if (updateState(pending,__IDLE))
if (updateState(pending, __IDLE))
return pending.fail(cause);
break;
@ -495,7 +520,7 @@ abstract public class WriteFlusher
if (DEBUG)
LOG.debug("failed: " + this, cause);
if (updateState(current,new FailedState(cause)))
if (updateState(current, new FailedState(cause)))
return false;
break;
}
@ -512,29 +537,9 @@ abstract public class WriteFlusher
return _state.get().getType() == StateType.IDLE;
}
public boolean isInProgress()
{
switch(_state.get().getType())
{
case WRITING:
case PENDING:
case COMPLETING:
return true;
default:
return false;
}
}
@Override
public String toString()
{
State s = _state.get();
return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s,s instanceof PendingState?((PendingState)s).getCallback():null);
}
public String toStateString()
{
switch(_state.get().getType())
switch (_state.get().getType())
{
case WRITING:
return "W";
@ -550,4 +555,33 @@ abstract public class WriteFlusher
return "?";
}
}
@Override
public String toString()
{
State s = _state.get();
return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s).getCallback() : null);
}
/**
* <p>A listener of {@link WriteFlusher} events.</p>
*/
public interface Listener
{
/**
* <p>Invoked when a {@link WriteFlusher} flushed bytes in a non-blocking way,
* as part of a - possibly larger - write.</p>
* <p>This method may be invoked multiple times, for example when writing a large
* buffer: a first flush of bytes, then the connection became TCP congested, and
* a subsequent flush of bytes when the connection became writable again.</p>
* <p>This method is never invoked concurrently, but may be invoked by different
* threads, so implementations may not rely on thread-local variables.</p>
* <p>Implementations may throw an {@link IOException} to signal that the write
* should fail, for example if the implementation enforces a minimum data rate.</p>
*
* @param bytes the number of bytes flushed
* @throws IOException if the write should fail
*/
void onFlushed(long bytes) throws IOException;
}
}

View File

@ -66,6 +66,7 @@ public class HttpConfiguration
private boolean _persistentConnectionsEnabled = true;
private int _maxErrorDispatches = 10;
private long _minRequestDataRate;
private long _minResponseDataRate;
private CookieCompliance _cookieCompliance = CookieCompliance.RFC6265;
private boolean _notifyRemoteAsyncErrors = true;
@ -127,6 +128,7 @@ public class HttpConfiguration
_persistentConnectionsEnabled=config._persistentConnectionsEnabled;
_maxErrorDispatches=config._maxErrorDispatches;
_minRequestDataRate=config._minRequestDataRate;
_minResponseDataRate=config._minResponseDataRate;
_cookieCompliance=config._cookieCompliance;
_notifyRemoteAsyncErrors=config._notifyRemoteAsyncErrors;
}
@ -496,7 +498,29 @@ public class HttpConfiguration
{
_minRequestDataRate=bytesPerSecond;
}
/**
* @return The minimum response data rate in bytes per second; or &lt;=0 for no limit
*/
@ManagedAttribute("The minimum response content data rate in bytes per second")
public long getMinResponseDataRate()
{
return _minResponseDataRate;
}
/**
* <p>Sets an minimum response content data rate.</p>
* <p>The value is enforced only approximately - not precisely - due to the fact that
* for efficiency reasons buffer writes may be comprised of both response headers and
* response content.</p>
*
* @param bytesPerSecond The minimum response data rate in bytes per second; or &lt;=0 for no limit
*/
public void setMinResponseDataRate(long bytesPerSecond)
{
_minResponseDataRate = bytesPerSecond;
}
public CookieCompliance getCookieCompliance()
{
return _cookieCompliance;

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
@ -49,7 +50,7 @@ import org.eclipse.jetty.util.log.Logger;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom, WriteFlusher.Listener
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
@ -195,6 +196,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return null;
}
@Override
public void onFlushed(long bytes) throws IOException
{
// Unfortunately cannot distinguish between header and content
// bytes, and for content bytes whether they are chunked or not.
_channel.getResponse().getHttpOutput().onFlushed(bytes);
}
void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())

View File

@ -282,7 +282,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentArrived < minimum_data)
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate));
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request content data rate < %d B/s",minRequestDataRate));
}
}

View File

@ -24,6 +24,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
@ -122,12 +123,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private final HttpChannel _channel;
private final SharedBlockingCallback _writeBlocker;
private Interceptor _interceptor;
/**
* Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written.
*/
private long _written;
private long _flushed;
private long _firstByteTimeStamp = -1;
private ByteBuffer _aggregate;
private int _bufferSize;
private int _commitSize;
@ -231,6 +229,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
protected void write(ByteBuffer content, boolean complete, Callback callback)
{
if (_firstByteTimeStamp == -1)
{
long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate();
if (minDataRate > 0)
_firstByteTimeStamp = System.nanoTime();
else
_firstByteTimeStamp = Long.MAX_VALUE;
}
_interceptor.write(content, complete, callback);
}
@ -908,6 +914,30 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_commitSize = size;
}
/**
* <p>Invoked when bytes have been flushed to the network.</p>
* <p>The number of flushed bytes may be different from the bytes written
* by the application if an {@link Interceptor} changed them, for example
* by compressing them.</p>
*
* @param bytes the number of bytes flushed
* @throws IOException if the minimum data rate, when set, is not respected
* @see org.eclipse.jetty.io.WriteFlusher.Listener
*/
public void onFlushed(long bytes) throws IOException
{
if (_firstByteTimeStamp == -1 || _firstByteTimeStamp == Long.MAX_VALUE)
return;
long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate();
_flushed += bytes;
long elapsed = System.nanoTime() - _firstByteTimeStamp;
long minFlushed = minDataRate * TimeUnit.NANOSECONDS.toMillis(elapsed) / TimeUnit.SECONDS.toMillis(1);
if (LOG.isDebugEnabled())
LOG.debug("Flushed bytes min/actual {}/{}", minFlushed, _flushed);
if (_flushed < minFlushed)
throw new IOException(String.format("Response content data rate < %d B/s", minDataRate));
}
public void recycle()
{
_interceptor = _channel;
@ -920,6 +950,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_written = 0;
_writeListener = null;
_onError = null;
_firstByteTimeStamp = -1;
_flushed = 0;
reopen();
}

View File

@ -27,11 +27,16 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class EmptyServerHandler extends AbstractHandler
public class EmptyServerHandler extends AbstractHandler.ErrorDispatchHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
jettyRequest.setHandled(true);
service(target, jettyRequest, request, response);
}
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
}

View File

@ -37,7 +37,6 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.BytesContentProvider;
@ -497,9 +496,8 @@ public class HttpClientTest extends AbstractTest
start(new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
super.handle(target, baseRequest, request, response);
response.getWriter().write("Jetty");
}
});

View File

@ -43,6 +43,8 @@ import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
@ -50,7 +52,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
public class ServerTimeoutsTest extends AbstractTest
@ -736,6 +740,76 @@ public class ServerTimeoutsTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingWriteWithMinimumDataRateBelowLimit() throws Exception
{
// This test needs a large write to stall the server, and a slow reading client.
// In HTTP/1.1, when using the loopback interface, the buffers are so large that
// it would require a very large write (32 MiB) and a lot of time for this test
// to pass. On the first writes, the server fills in the large buffers with a lot
// of bytes (about 4 MiB), and so it would take a lot of time for the client to
// read those bytes and eventually produce a write rate that will make the server
// fail; and the write should be large enough to _not_ complete before the rate
// is below the minimum.
// In HTTP/2, we force the flow control window to be small, so that the server
// stalls almost immediately without having written many bytes, so that the test
// completes quickly.
Assume.assumeThat(transport, Matchers.isOneOf(Transport.H2, Transport.H2C));
int bytesPerSecond = 16 * 1024;
httpConfig.setMinResponseDataRate(bytesPerSecond);
CountDownLatch serverLatch = new CountDownLatch(1);
start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
ServletOutputStream output = response.getOutputStream();
output.write(new byte[8 * 1024 * 1024]);
}
catch (IOException x)
{
serverLatch.countDown();
}
}
});
((HttpClientTransportOverHTTP2)client.getTransport()).getHTTP2Client().setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
// Setup the client to read slower than the min data rate.
BlockingQueue<Object> objects = new LinkedBlockingQueue<>();
CountDownLatch clientLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
objects.offer(content.remaining());
objects.offer(callback);
})
.send(result ->
{
objects.offer(-1);
objects.offer(Callback.NOOP);
if (result.isFailed())
clientLatch.countDown();
});
long readRate = bytesPerSecond / 2;
while (true)
{
int bytes = (Integer)objects.poll(5, TimeUnit.SECONDS);
if (bytes < 0)
break;
long ms = bytes * 1000L / readRate;
Thread.sleep(ms);
Callback callback = (Callback)objects.poll();
callback.succeeded();
}
Assert.assertTrue(serverLatch.await(15, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(15, TimeUnit.SECONDS));
}
private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler
{
private final CountDownLatch handlerLatch;