Get rid of addContent() by making produceContent() return Content instead.

Make EOF and errors be special content.
Transition to a much simplified FSM by using the needContent() / produceContent() model.
Implement blocking on top of async, this way there is only one FSM.
(Milestone 6)

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2020-03-20 09:48:11 +01:00
parent a4258ec9c0
commit 814dc69803
27 changed files with 2997 additions and 1020 deletions

View File

@ -1377,9 +1377,12 @@ public class SslBytesServerTest extends SslBytesTest
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));
assertNull(request.get(5, TimeUnit.SECONDS));
@ -1399,9 +1402,12 @@ public class SslBytesServerTest extends SslBytesTest
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));
closeClient(client);
}
@ -1596,9 +1602,12 @@ public class SslBytesServerTest extends SslBytesTest
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(70));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(70));
closeClient(client);
}
@ -1743,9 +1752,12 @@ public class SslBytesServerTest extends SslBytesTest
// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(80));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(120));
closeClient(client);
}

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +50,8 @@ public class HttpChannelOverFCGI extends HttpChannel
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class);
private final Queue<HttpInput.Content> _contentQueue = new LinkedList<>();
private Throwable _contentFailure;
private final AutoLock _lock = new AutoLock();
private HttpInput.Content _specialContent;
private final HttpFields.Mutable fields = HttpFields.build();
private final Dispatcher dispatcher;
private String method;
@ -64,49 +66,99 @@ public class HttpChannelOverFCGI extends HttpChannel
this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this);
}
void enqueueContent(HttpInput.Content content)
@Override
public boolean onContent(HttpInput.Content content)
{
boolean b = super.onContent(content);
Throwable failure;
synchronized (_contentQueue)
try (AutoLock l = _lock.lock())
{
failure = _contentFailure;
failure = _specialContent == null ? null : _specialContent.getError();
if (failure == null)
_contentQueue.offer(content);
}
if (failure != null)
content.failed(failure);
return b;
}
@Override
public void produceContent()
public boolean needContent()
{
try (AutoLock l = _lock.lock())
{
boolean hasContent = _specialContent != null || !_contentQueue.isEmpty();
if (LOG.isDebugEnabled())
LOG.debug("needContent has content? {}", hasContent);
return hasContent;
}
}
@Override
public HttpInput.Content produceContent()
{
HttpInput.Content content;
synchronized (_contentQueue)
try (AutoLock l = _lock.lock())
{
if (_contentFailure != null)
content = null;
else
content = _contentQueue.poll();
content = _contentQueue.poll();
if (content == null)
content = _specialContent;
}
if (content != null)
onContent(content);
if (LOG.isDebugEnabled())
LOG.debug("produceContent has produced {}", content);
return content;
}
@Override
public void failContent(Throwable failure)
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {}", (Object)failure);
List<HttpInput.Content> copy;
synchronized (_contentQueue)
try (AutoLock l = _lock.lock())
{
if (_contentFailure == null)
_contentFailure = failure;
else if (_contentFailure != failure)
_contentFailure.addSuppressed(failure);
copy = new ArrayList<>(_contentQueue);
_contentQueue.clear();
}
copy.forEach(content -> content.failed(failure));
copy.forEach(c -> c.failed(failure));
HttpInput.Content lastContent = copy.isEmpty() ? null : copy.get(copy.size() - 1);
boolean atEof = lastContent != null && lastContent.isEof();
if (LOG.isDebugEnabled())
LOG.debug("failed all content, EOF = {}", atEof);
return atEof;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
try (AutoLock l = _lock.lock())
{
Throwable error = _specialContent == null ? null : _specialContent.getError();
if (error != null && error != x)
error.addSuppressed(x);
else
_specialContent = new HttpInput.ErrorContent(x);
}
return getRequest().getHttpInput().onContentProducible();
}
@Override
protected boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.EofContent();
}
return getRequest().getHttpInput().onContentProducible();
}
protected void header(HttpField field)
@ -179,12 +231,46 @@ public class HttpChannelOverFCGI extends HttpChannel
public boolean onIdleTimeout(Throwable timeout)
{
boolean handle = getRequest().getHttpInput().onIdleTimeout(timeout);
boolean handle = doOnIdleTimeout(timeout);
if (handle)
execute(this);
return !handle;
}
private boolean doOnIdleTimeout(Throwable x)
{
boolean neverDispatched = getState().isIdle();
boolean waitingForContent;
HttpInput.Content specialContent;
try (AutoLock l = _lock.lock())
{
waitingForContent = _contentQueue.isEmpty() || _contentQueue.peek().remaining() == 0;
specialContent = _specialContent;
}
if ((waitingForContent || neverDispatched) && specialContent == null)
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.ErrorContent(x);
}
return getRequest().getHttpInput().onContentProducible();
}
return false;
}
@Override
public void recycle()
{
try (AutoLock l = _lock.lock())
{
if (!_contentQueue.isEmpty())
throw new AssertionError("unconsumed content: " + _contentQueue);
_specialContent = null;
}
super.recycle();
}
private static class Dispatcher implements Runnable
{
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);

View File

@ -197,7 +197,7 @@ public class ServerFCGIConnection extends AbstractConnection
{
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
copy.put(buffer).flip();
channel.enqueueContent(new HttpInput.Content(copy));
channel.onContent(new HttpInput.Content(copy));
}
return false;
}

View File

@ -22,6 +22,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@ -77,7 +78,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private Listener listener;
private long dataLength;
private long dataDemand;
private Throwable failure;
private boolean dataInitial;
private boolean dataProcess;
@ -237,20 +237,18 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
@Override
public void fail(Throwable x)
public boolean failAllData(Throwable x)
{
List<DataEntry> copy;
try (AutoLock l = lock.lock())
{
dataDemand = 0;
failure = x;
while (true)
{
DataEntry dataEntry = dataQueue.poll();
if (dataEntry == null)
break;
dataEntry.callback.failed(x);
}
copy = new ArrayList<>(dataQueue);
dataQueue.clear();
}
copy.forEach(dataEntry -> dataEntry.callback.failed(x));
DataEntry lastDataEntry = copy.isEmpty() ? null : copy.get(copy.size() - 1);
return lastDataEntry != null && lastDataEntry.frame.isEndStream();
}
public boolean isLocallyClosed()
@ -418,12 +416,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
DataEntry entry = new DataEntry(frame, callback);
try (AutoLock l = lock.lock())
{
if (failure != null)
{
// stream has been failed
callback.failed(failure);
return;
}
dataQueue.offer(entry);
initial = dataInitial;
if (initial)
@ -463,8 +455,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
boolean proceed = false;
try (AutoLock l = lock.lock())
{
if (failure != null)
return; // stream has been failed
demand = dataDemand = MathUtils.cappedAdd(dataDemand, n);
if (!dataProcess)
dataProcess = proceed = !dataQueue.isEmpty();

View File

@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
public abstract class HTTP2StreamEndPoint implements EndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2StreamEndPoint.class);
private static final Throwable EOF = new Throwable();
private final AutoLock lock = new AutoLock();
private final Deque<Entry> dataQueue = new ArrayDeque<>();
@ -217,6 +216,9 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
else
{
entry.succeed();
// WebSocket does not have a backpressure API so you must always demand
// the next frame after succeeding the previous one.
stream.demand(1);
}
return length;
}
@ -531,7 +533,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
{
if (buffer.hasRemaining())
offer(buffer, Callback.from(Callback.NOOP::succeeded, callback::failed), null);
offer(BufferUtil.EMPTY_BUFFER, callback, EOF);
offer(BufferUtil.EMPTY_BUFFER, callback, Entry.EOF);
}
else
{
@ -582,8 +584,10 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
writeState);
}
private class Entry
private static class Entry
{
private static final Throwable EOF = new Throwable();
private final ByteBuffer buffer;
private final Callback callback;
private final Throwable failure;
@ -610,7 +614,6 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
private void succeed()
{
callback.succeeded();
stream.demand(1);
}
private void fail(Throwable failure)

View File

@ -119,6 +119,14 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isRemotelyClosed();
/**
* Fail all data queued in the stream and reset
* demand to 0.
* @param x the exception to fail the data with.
* @return true if the end of the stream was reached, false otherwise.
*/
boolean failAllData(Throwable x);
/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
@ -126,8 +134,6 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isResetOrFailed();
void fail(Throwable x);
/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/

View File

@ -0,0 +1,26 @@
@startuml
null:
content:
DEMANDING:
EOF:
[*] --> null
null --> DEMANDING : demand()
null --> EOF : eof()
null -left-> null : onTimeout()
DEMANDING --> DEMANDING : demand()
DEMANDING --> content : onContent()\n onTimeout()
DEMANDING --> EOF : eof()
EOF --> EOF : eof()\n onTimeout()
note bottom of content: content1 -> content2 is only\nvalid if content1 is special
note top of content: content -> null only happens\nwhen content is not special
content --> content : onContent()\n onTimeout()
content --> null: take()
content --> EOF: eof()
@enduml

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.http2.server;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.jetty.http.BadMessageException;
@ -45,7 +46,6 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,68 +58,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
private boolean _expect100Continue;
private boolean _delayedUntilContent;
private boolean _useOutputDirectByteBuffers;
private final RequestContent _requestContent = new RequestContent();
private class RequestContent
{
private HttpInput.Content _content;
private boolean _endStream;
private boolean _producing;
private final AutoLock _lock = new AutoLock();
void setContent(boolean endStream, HttpInput.Content content)
{
try (AutoLock ignored = _lock.lock())
{
if (_content != null)
throw new AssertionError("content cannot be queued; stream=" + getStream());
_endStream = endStream;
_content = content;
_producing = false;
}
}
private HttpInput.Content takeContent(boolean[] endStreamResult)
{
try (AutoLock ignored = _lock.lock())
{
if (_content == null)
return null;
HttpInput.Content contentCopy = _content;
endStreamResult[0] = _endStream;
_content = null;
_endStream = false;
return contentCopy;
}
}
HttpInput.Content takeContentOrDemand(boolean[] endStreamResult)
{
HttpInput.Content content = takeContent(endStreamResult);
if (content != null)
return content;
boolean demand;
try (AutoLock ignored = _lock.lock())
{
demand = !_producing;
if (demand)
{
if (_content != null)
throw new AssertionError("_content should be null");
_producing = true;
}
}
if (demand)
getStream().demand(1);
return takeContent(endStreamResult);
}
}
private final ContentDemander _contentDemander;
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
super(connector, configuration, endPoint, transport);
_contentDemander = new ContentDemander();
}
protected IStream getStream()
@ -192,14 +136,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
// Delay the demand of DATA frames for CONNECT with :protocol
// or for normal requests expecting 100 continue.
if (!connect)
if (connect)
{
if (!_expect100Continue)
getStream().demand(1);
if (request.getProtocol() == null)
_contentDemander.demand(false);
}
else if (request.getProtocol() == null)
else
{
getStream().demand(1);
if (_delayedUntilContent)
_contentDemander.demand(false);
}
if (LOG.isDebugEnabled())
@ -271,6 +216,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
{
_expect100Continue = false;
_delayedUntilContent = false;
_contentDemander.recycle();
super.recycle();
getHttpTransport().recycle();
}
@ -291,38 +237,16 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
@Override
public Runnable onData(DataFrame frame, Callback callback)
{
return onRequestContent(frame, callback);
}
public Runnable onRequestContent(DataFrame frame, final Callback callback)
{
Stream stream = getStream();
if (stream.isReset())
{
// Consume previously queued content to
// enlarge the session flow control window.
consumeInput();
// Consume immediately this content.
callback.succeeded();
return null;
}
ByteBuffer buffer = frame.getData();
int length = buffer.remaining();
if (LOG.isDebugEnabled())
HttpInput.Content content = new HttpInput.Content(buffer)
{
LOG.debug("HTTP2 Request #{}/{}: {} bytes of content",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length);
}
@Override
public boolean isEof()
{
return frame.isEndStream();
}
boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false;
_requestContent.setContent(frame.isEndStream(), new HttpInput.Content(buffer)
{
@Override
public void succeeded()
{
@ -340,42 +264,357 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
{
return callback.getInvocationType();
}
});
if (getState().isAsync())
};
boolean needed = _contentDemander.onContent(content);
boolean handle = onContent(content);
boolean endStream = frame.isEndStream();
if (endStream)
{
boolean handle = getState().onReadPossible();
return handle || wasDelayed ? this : null;
boolean handleContent = onContentComplete();
// This will generate EOF -> must happen before onContentProducible.
boolean handleRequest = onRequestComplete();
handle |= handleContent | handleRequest;
}
else
boolean woken = needed && getRequest().getHttpInput().onContentProducible();
handle |= woken;
if (LOG.isDebugEnabled())
{
getRequest().getHttpInput().unblock();
return wasDelayed ? this : null;
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, woken: {}, needed: {}, handle: {}",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
endStream ? "last" : "some",
woken,
needed,
handle);
}
boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false;
return handle || wasDelayed ? this : null;
}
/**
* Demanding content is a marker content that is used to remember that a demand was
* registered into the stream. The {@code needed} flag indicates if the demand originated
* from a call to {@link #produceContent()} when false or {@link #needContent()}
* when true, as {@link HttpInput#onContentProducible()} must only be called
* only when {@link #needContent()} was called.
* Instances of this class must never escape the scope of this channel impl,
* so {@link #produceContent()} must never return one.
*/
private static final class DemandingContent extends HttpInput.SpecialContent
{
private final boolean needed;
private DemandingContent(boolean needed)
{
this.needed = needed;
}
}
@Override
public void produceContent()
{
// HttpInputOverHttp2 calls this method via produceRawContent();
// this is the equivalent of Http1 parseAndFill().
private static final HttpInput.Content EOF = new HttpInput.EofContent();
private static final HttpInput.Content DEMANDING_NEEDED = new DemandingContent(true);
private static final HttpInput.Content DEMANDING_NOT_NEEDED = new DemandingContent(false);
boolean[] endStreamResult = new boolean[1];
HttpInput.Content content = _requestContent.takeContentOrDemand(endStreamResult);
if (content != null)
private class ContentDemander
{
private final AtomicReference<HttpInput.Content> _content = new AtomicReference<>();
public void recycle()
{
onContent(content);
if (endStreamResult[0])
if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this);
HttpInput.Content c = _content.getAndSet(null);
if (c != null && !c.isSpecial())
throw new AssertionError("unconsumed content: " + c);
}
public HttpInput.Content poll()
{
while (true)
{
onContentComplete();
onRequestComplete();
HttpInput.Content c = _content.get();
if (LOG.isDebugEnabled())
LOG.debug("poll, content = {}", c);
if (c == null || c.isSpecial() || _content.compareAndSet(c, c.isEof() ? EOF : null))
{
if (LOG.isDebugEnabled())
LOG.debug("returning current content");
return c;
}
}
}
public boolean demand(boolean needed)
{
while (true)
{
HttpInput.Content c = _content.get();
if (LOG.isDebugEnabled())
LOG.debug("demand({}), content = {}", needed, c);
if (c instanceof DemandingContent)
{
if (needed && !((DemandingContent)c).needed)
{
if (!_content.compareAndSet(c, DEMANDING_NEEDED))
{
if (LOG.isDebugEnabled())
LOG.debug("already demanding but switched needed flag to true");
continue;
}
}
if (LOG.isDebugEnabled())
LOG.debug("already demanding, returning false");
return false;
}
if (c != null)
{
if (LOG.isDebugEnabled())
LOG.debug("content available, returning true");
return true;
}
if (_content.compareAndSet(null, needed ? DEMANDING_NEEDED : DEMANDING_NOT_NEEDED))
{
IStream stream = getStream();
if (stream == null)
{
_content.set(null);
if (LOG.isDebugEnabled())
LOG.debug("no content available, switched to demanding but stream is now null");
return false;
}
if (LOG.isDebugEnabled())
LOG.debug("no content available, demanding stream {}", stream);
stream.demand(1);
c = _content.get();
boolean hasContent = !(c instanceof DemandingContent) && c != null;
if (LOG.isDebugEnabled())
LOG.debug("has content now? {}", hasContent);
return hasContent;
}
}
}
public boolean onContent(HttpInput.Content content)
{
while (true)
{
HttpInput.Content c = _content.get();
if (LOG.isDebugEnabled())
LOG.debug("content delivered by stream: {}, current content: {}", content, c);
if (c instanceof DemandingContent)
{
if (_content.compareAndSet(c, content))
{
boolean needed = ((DemandingContent)c).needed;
if (LOG.isDebugEnabled())
LOG.debug("replacing demand content with {} succeeded; returning {}", content, needed);
return needed;
}
}
else if (c == null)
{
if (!content.isSpecial())
{
// This should never happen, consider as a bug.
content.failed(new IllegalStateException("Non special content without demand : " + content));
return false;
}
if (_content.compareAndSet(null, content))
{
if (LOG.isDebugEnabled())
LOG.debug("replacing null content with {} succeeded", content);
return false;
}
}
else if (c.isEof() && content.isEof() && content.isEmpty())
{
content.succeeded();
return true;
}
else if (content.getError() != null)
{
if (c.getError() != null)
{
if (c.getError() != content.getError())
c.getError().addSuppressed(content.getError());
return true;
}
if (_content.compareAndSet(c, content))
{
c.failed(content.getError());
if (LOG.isDebugEnabled())
LOG.debug("replacing current content with {} succeeded", content);
return true;
}
}
else if (c.getError() != null && content.remaining() == 0)
{
content.succeeded();
return true;
}
else
{
// This should never happen, consider as a bug.
content.failed(new IllegalStateException("Cannot overwrite exiting content " + c + " with " + content));
return false;
}
}
}
public boolean onTimeout(Throwable failure)
{
while (true)
{
HttpInput.Content c = _content.get();
if (LOG.isDebugEnabled())
LOG.debug("onTimeout with current content: {} and failure = {}", c, failure);
if (!(c instanceof DemandingContent))
return false;
if (_content.compareAndSet(c, new HttpInput.ErrorContent(failure)))
{
if (LOG.isDebugEnabled())
LOG.debug("replacing current content with error succeeded");
return true;
}
}
}
public void eof()
{
while (true)
{
HttpInput.Content c = _content.get();
if (LOG.isDebugEnabled())
LOG.debug("eof with current content: {}", c);
if (c instanceof DemandingContent)
{
if (_content.compareAndSet(c, EOF))
{
if (LOG.isDebugEnabled())
LOG.debug("replacing current content with special EOF succeeded");
return;
}
}
else if (c == null)
{
if (_content.compareAndSet(null, EOF))
{
if (LOG.isDebugEnabled())
LOG.debug("replacing null content with special EOF succeeded");
return;
}
}
else if (c.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("current content already is EOF");
return;
}
else if (c.remaining() == 0)
{
if (_content.compareAndSet(c, EOF))
{
if (LOG.isDebugEnabled())
LOG.debug("replacing current content with special EOF succeeded");
return;
}
}
else
{
// EOF may arrive with HEADERS frame (e.g. a trailer) that is not flow controlled, so we need to wrap the existing content.
// Covered by HttpTrailersTest.testRequestTrailersWithContent.
HttpInput.Content content = new HttpInput.WrappingContent(c, true);
if (_content.compareAndSet(c, content))
{
if (LOG.isDebugEnabled())
LOG.debug("replacing current content with {} succeeded", content);
return;
}
}
}
}
public boolean failContent(Throwable failure)
{
while (true)
{
HttpInput.Content c = _content.get();
if (LOG.isDebugEnabled())
LOG.debug("failing current content: {} with failure: {}", c, failure);
if (c == null)
return false;
if (c.isSpecial())
return c.isEof();
if (_content.compareAndSet(c, null))
{
c.failed(failure);
if (LOG.isDebugEnabled())
LOG.debug("replacing current content with null succeeded");
return false;
}
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "@" + hashCode() + " _content=" + _content;
}
}
@Override
public void failContent(Throwable failure)
public boolean needContent()
{
getStream().fail(failure);
boolean hasContent = _contentDemander.demand(true);
if (LOG.isDebugEnabled())
LOG.debug("needContent has content? {}", hasContent);
return hasContent;
}
@Override
public HttpInput.Content produceContent()
{
HttpInput.Content content = null;
if (_contentDemander.demand(false))
content = _contentDemander.poll();
if (LOG.isDebugEnabled())
LOG.debug("produceContent produced {}", content);
return content;
}
@Override
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {}", (Object)failure);
boolean atEof = getStream().failAllData(failure);
atEof |= _contentDemander.failContent(failure);
if (LOG.isDebugEnabled())
LOG.debug("failed all content, reached EOF? {}", atEof);
return atEof;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
_contentDemander.onContent(new HttpInput.ErrorContent(x));
return getRequest().getHttpInput().onContentProducible();
}
@Override
protected boolean eof()
{
_contentDemander.eof();
return false;
}
@Override
@ -393,7 +632,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
System.lineSeparator(), trailers);
}
// This will generate EOF -> need to call onContentProducible.
boolean handle = onRequestComplete();
boolean woken = getRequest().getHttpInput().onContentProducible();
handle |= woken;
boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false;
@ -412,25 +654,30 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
final boolean delayed = _delayedUntilContent;
_delayedUntilContent = false;
boolean result = isIdle();
if (result)
boolean reset = isIdle();
if (reset)
consumeInput();
getHttpTransport().onStreamTimeout(failure);
if (getRequest().getHttpInput().onIdleTimeout(failure) || delayed)
failure.addSuppressed(new Throwable("HttpInput idle timeout"));
_contentDemander.onTimeout(failure);
boolean needed = getRequest().getHttpInput().onContentProducible();
if (needed || delayed)
{
consumer.accept(this::handleWithContext);
result = false;
reset = false;
}
return result;
return reset;
}
@Override
public Runnable onFailure(Throwable failure, Callback callback)
{
getHttpTransport().onStreamFailure(failure);
boolean handle = getRequest().getHttpInput().failed(failure);
boolean handle = failed(failure);
consumeInput();
return new FailureTask(failure, callback, handle);
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.server.Authentication;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
@ -63,13 +64,33 @@ public class SpnegoAuthenticatorTest
}
@Override
public void produceContent()
public boolean failed(Throwable x)
{
return false;
}
@Override
public void failContent(Throwable failure)
protected boolean eof()
{
return false;
}
@Override
public boolean needContent()
{
return false;
}
@Override
public HttpInput.Content produceContent()
{
return null;
}
@Override
public boolean failAllContent(Throwable failure)
{
return false;
}
@Override
@ -108,13 +129,33 @@ public class SpnegoAuthenticatorTest
}
@Override
public void produceContent()
public boolean failed(Throwable x)
{
return false;
}
@Override
public void failContent(Throwable failure)
protected boolean eof()
{
return false;
}
@Override
public boolean needContent()
{
return false;
}
@Override
public HttpInput.Content produceContent()
{
return null;
}
@Override
public boolean failAllContent(Throwable failure)
{
return false;
}
@Override

View File

@ -0,0 +1,354 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Non-blocking {@link ContentProducer} implementation. Calling {@link #nextContent()} will never block
* but will return null when there is no available content.
*/
class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
private final HttpChannel _httpChannel;
private HttpInput.Interceptor _interceptor;
private HttpInput.Content _rawContent;
private HttpInput.Content _transformedContent;
private boolean _error;
private long _firstByteTimeStamp = Long.MIN_VALUE;
private long _rawContentArrived;
AsyncContentProducer(HttpChannel httpChannel)
{
_httpChannel = httpChannel;
}
@Override
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycling {}", this);
_interceptor = null;
_rawContent = null;
_transformedContent = null;
_error = false;
_firstByteTimeStamp = Long.MIN_VALUE;
_rawContentArrived = 0L;
}
@Override
public HttpInput.Interceptor getInterceptor()
{
return _interceptor;
}
@Override
public void setInterceptor(HttpInput.Interceptor interceptor)
{
this._interceptor = interceptor;
}
@Override
public int available()
{
HttpInput.Content content = nextTransformedContent();
int available = content == null ? 0 : content.remaining();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
return available;
}
@Override
public boolean hasContent()
{
boolean hasContent = _rawContent != null;
if (LOG.isDebugEnabled())
LOG.debug("hasContent = {}", hasContent);
return hasContent;
}
@Override
public boolean isError()
{
if (LOG.isDebugEnabled())
LOG.debug("isError = {}", _error);
return _error;
}
@Override
public void checkMinDataRate()
{
long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate();
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate [m={},t={}]", minRequestDataRate, _firstByteTimeStamp);
if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE)
{
long period = System.nanoTime() - _firstByteTimeStamp;
if (period > 0)
{
long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (getRawContentArrived() < minimumData)
{
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate check failed");
BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,
String.format("Request content data rate < %d B/s", minRequestDataRate));
if (_httpChannel.getState().isResponseCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate aborting channel");
_httpChannel.abort(bad);
}
failCurrentContent(bad);
throw bad;
}
}
}
}
@Override
public long getRawContentArrived()
{
if (LOG.isDebugEnabled())
LOG.debug("getRawContentArrived = {}", _rawContentArrived);
return _rawContentArrived;
}
@Override
public boolean consumeAll(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("consumeAll [e={}]", (Object)x);
failCurrentContent(x);
// A specific HttpChannel mechanism must be used as the following code
// does not guarantee that the channel will synchronously deliver all
// content it already contains:
// while (true)
// {
// HttpInput.Content content = _httpChannel.produceContent();
// ...
// }
// as the HttpChannel's produceContent() contract makes no such promise;
// for instance the H2 implementation calls Stream.demand() that may
// deliver the content asynchronously. Tests in StreamResetTest cover this.
boolean atEof = _httpChannel.failAllContent(x);
if (LOG.isDebugEnabled())
LOG.debug("failed all content of http channel; at EOF? {}", atEof);
return atEof;
}
private void failCurrentContent(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failing currently held content [r={},t={}]", _rawContent, _transformedContent, x);
if (_transformedContent != null && !_transformedContent.isSpecial())
{
if (_transformedContent != _rawContent)
{
_transformedContent.skip(_transformedContent.remaining());
_transformedContent.failed(x);
}
_transformedContent = null;
}
if (_rawContent != null && !_rawContent.isSpecial())
{
_rawContent.skip(_rawContent.remaining());
_rawContent.failed(x);
_rawContent = null;
}
}
@Override
public boolean onContentProducible()
{
if (LOG.isDebugEnabled())
LOG.debug("onContentProducible");
return _httpChannel.getState().onReadReady();
}
@Override
public HttpInput.Content nextContent()
{
HttpInput.Content content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("nextContent = {}", content);
if (content != null)
_httpChannel.getState().onReadIdle();
return content;
}
@Override
public void reclaim(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("reclaim {} [t={}]", content, _transformedContent);
if (_transformedContent == content)
{
content.succeeded();
if (_transformedContent == _rawContent)
_rawContent = null;
_transformedContent = null;
}
}
@Override
public boolean isReady()
{
HttpInput.Content content = nextTransformedContent();
if (content == null)
{
_httpChannel.getState().onReadUnready();
if (_httpChannel.needContent())
{
content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("isReady got transformed content after needContent retry {}", content);
if (content != null)
_httpChannel.getState().onContentAdded();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("isReady has no transformed content after needContent");
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("isReady got transformed content {}", content);
_httpChannel.getState().onContentAdded();
}
boolean ready = content != null;
if (LOG.isDebugEnabled())
LOG.debug("isReady = {}", ready);
return ready;
}
private HttpInput.Content nextTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("nextTransformedContent [r={},t={}]", _rawContent, _transformedContent);
if (_rawContent == null)
{
_rawContent = produceRawContent();
if (_rawContent == null)
return null;
}
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted transformed content");
_transformedContent = null;
}
while (_transformedContent == null)
{
if (_rawContent.isSpecial())
{
// TODO does EOF need to be passed to the interceptors?
_error = _rawContent.getError() != null;
if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it", _error);
return _rawContent;
}
if (_interceptor != null)
{
if (LOG.isDebugEnabled())
LOG.debug("using interceptor {} to transform raw content", _interceptor);
_transformedContent = _interceptor.readFrom(_rawContent);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("null interceptor, transformed content = raw content");
_transformedContent = _rawContent;
}
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted transformed content");
_transformedContent = null;
}
if (_transformedContent == null)
{
if (_rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted raw content");
_rawContent = produceRawContent();
if (_rawContent == null)
{
if (LOG.isDebugEnabled())
LOG.debug("produced null raw content, returning null");
return null;
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("raw content is not empty");
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("transformed content is not empty");
}
}
if (LOG.isDebugEnabled())
LOG.debug("returning transformed content {}", _transformedContent);
return _transformedContent;
}
private HttpInput.Content produceRawContent()
{
HttpInput.Content content = _httpChannel.produceContent();
if (content != null)
{
_rawContentArrived += content.remaining();
if (_firstByteTimeStamp == Long.MIN_VALUE)
_firstByteTimeStamp = System.nanoTime();
if (LOG.isDebugEnabled())
LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {}", _rawContentArrived, _firstByteTimeStamp);
}
if (LOG.isDebugEnabled())
LOG.debug("produceRawContent produced {}", content);
return content;
}
}

View File

@ -0,0 +1,164 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when
* there is no available content but will never return null.
*/
class BlockingContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class);
private final Semaphore _semaphore = new Semaphore(0);
private final AsyncContentProducer _asyncContentProducer;
BlockingContentProducer(AsyncContentProducer delegate)
{
_asyncContentProducer = delegate;
}
@Override
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycling {}", this);
_asyncContentProducer.recycle();
_semaphore.drainPermits();
}
@Override
public int available()
{
return _asyncContentProducer.available();
}
@Override
public boolean hasContent()
{
return _asyncContentProducer.hasContent();
}
@Override
public boolean isError()
{
return _asyncContentProducer.isError();
}
@Override
public void checkMinDataRate()
{
_asyncContentProducer.checkMinDataRate();
}
@Override
public long getRawContentArrived()
{
return _asyncContentProducer.getRawContentArrived();
}
@Override
public boolean consumeAll(Throwable x)
{
return _asyncContentProducer.consumeAll(x);
}
@Override
public HttpInput.Content nextContent()
{
while (true)
{
HttpInput.Content content = _asyncContentProducer.nextContent();
if (LOG.isDebugEnabled())
LOG.debug("nextContent async producer returned {}", content);
if (content != null)
return content;
// IFF isReady() returns false then HttpChannel.needContent() has been called,
// thus we know that eventually a call to onContentProducible will come.
if (_asyncContentProducer.isReady())
{
if (LOG.isDebugEnabled())
LOG.debug("nextContent async producer is ready, retrying");
continue;
}
if (LOG.isDebugEnabled())
LOG.debug("nextContent async producer is not ready, waiting on semaphore {}", _semaphore);
try
{
_semaphore.acquire();
}
catch (InterruptedException e)
{
return new HttpInput.ErrorContent(e);
}
}
}
@Override
public void reclaim(HttpInput.Content content)
{
_asyncContentProducer.reclaim(content);
}
@Override
public boolean isReady()
{
boolean ready = available() > 0;
if (LOG.isDebugEnabled())
LOG.debug("isReady = {}", ready);
return ready;
}
@Override
public HttpInput.Interceptor getInterceptor()
{
return _asyncContentProducer.getInterceptor();
}
@Override
public void setInterceptor(HttpInput.Interceptor interceptor)
{
_asyncContentProducer.setInterceptor(interceptor);
}
@Override
public boolean onContentProducible()
{
// In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state
// DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why
// this method always returns false.
// But async errors can occur while the dispatched thread is NOT blocked reading (i.e.: in state WAITING),
// so the WAITING to WOKEN transition must be done by the error-notifying thread which then has to reschedule the
// dispatched thread after HttpChannelState.asyncError() is called.
// Calling _asyncContentProducer.wakeup() changes the channel state from WAITING to WOKEN which would prevent the
// subsequent call to HttpChannelState.asyncError() from rescheduling the thread.
// AsyncServletTest.testStartAsyncThenClientStreamIdleTimeout() tests this.
if (LOG.isDebugEnabled())
LOG.debug("onContentProducible releasing semaphore {}", _semaphore);
_semaphore.release();
return false;
}
}

View File

@ -0,0 +1,141 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
/**
* ContentProducer is the bridge between {@link HttpInput} and {@link HttpChannel}.
* It wraps a {@link HttpChannel} and uses the {@link HttpChannel#needContent()},
* {@link HttpChannel#produceContent()} and {@link HttpChannel#failAllContent(Throwable)}
* methods, tracks the current state of the channel's input by updating the
* {@link HttpChannelState} and provides the necessary mechanism to unblock
* the reader thread when using a blocking implementation or to know if the reader thread
* has to be rescheduled when using an async implementation.
*/
public interface ContentProducer
{
/**
* Reset all internal state and clear any held resources.
*/
void recycle();
/**
* Fail all content currently available in this {@link ContentProducer} instance
* as well as in the underlying {@link HttpChannel}.
*
* This call is always non-blocking.
* Doesn't change state.
* @return true if EOF was reached.
*/
boolean consumeAll(Throwable x);
/**
* Check if the current data rate consumption is above the minimal rate.
* Abort the channel, fail the content currently available and throw a
* BadMessageException(REQUEST_TIMEOUT_408) if the check fails.
*/
void checkMinDataRate();
/**
* Get the byte count produced by the underlying {@link HttpChannel}.
*
* This call is always non-blocking.
* Doesn't change state.
* @return the byte count produced by the underlying {@link HttpChannel}.
*/
long getRawContentArrived();
/**
* Get the byte count that can immediately be read from this
* {@link ContentProducer} instance or the underlying {@link HttpChannel}.
*
* This call is always non-blocking.
* Doesn't change state.
* @return the available byte count.
*/
int available();
/**
* Check if this {@link ContentProducer} instance contains some
* content without querying the underlying {@link HttpChannel}.
*
* This call is always non-blocking.
* Doesn't change state.
* Doesn't query the HttpChannel.
* @return true if this {@link ContentProducer} instance contains content, false otherwise.
*/
boolean hasContent();
/**
* Check if the underlying {@link HttpChannel} reached an error content.
* This call is always non-blocking.
* Doesn't change state.
* Doesn't query the HttpChannel.
* @return true if the underlying {@link HttpChannel} reached an error content, false otherwise.
*/
boolean isError();
/**
* Get the next content that can be read from or that describes the special condition
* that was reached (error, eof).
* This call may or may not block until some content is available, depending on the implementation.
* The returned content is decoded by the interceptor set with {@link #setInterceptor(HttpInput.Interceptor)}
* or left as-is if no intercept is set.
* After this call, state can be either of UNREADY or IDLE.
* @return the next content that can be read from or null if the implementation does not block
* and has no available content.
*/
HttpInput.Content nextContent();
/**
* Free up the content by calling {@link HttpInput.Content#succeeded()} on it
* and updating this instance' internal state.
*/
void reclaim(HttpInput.Content content);
/**
* Check if this {@link ContentProducer} instance has some content that can be read without blocking.
* If there is some, the next call to {@link #nextContent()} will not block.
* If there isn't any and the implementation does not block, this method will trigger a
* {@link javax.servlet.ReadListener} callback once some content is available.
* This call is always non-blocking.
* After this call, state can be either of UNREADY or READY.
* @return true if some content is immediately available, false otherwise.
*/
boolean isReady();
/**
* Get the {@link org.eclipse.jetty.server.HttpInput.Interceptor}.
* @return The {@link org.eclipse.jetty.server.HttpInput.Interceptor}, or null if none set.
*/
HttpInput.Interceptor getInterceptor();
/**
* Set the interceptor.
* @param interceptor The interceptor to use.
*/
void setInterceptor(HttpInput.Interceptor interceptor);
/**
* Wake up the thread that is waiting for the next content.
* After this call, state can be READY.
* @return true if the thread has to be rescheduled, false otherwise.
*/
boolean onContentProducible();
}

View File

@ -124,9 +124,47 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
return new HttpInput(state);
}
public abstract void produceContent();
/**
* Notify the channel that content is needed. If some content is immediately available, true is returned and
* {@link #produceContent()} has to be called and will return a non-null object.
* If no content is immediately available, {@link HttpInput#onContentProducible()} is called once some content arrives
* and {@link #produceContent()} can be called without returning null.
* If a failure happens, then {@link HttpInput#onContentProducible()} will be called and an error content will return the
* error on the next call to {@link #produceContent()}.
* @return true if content is immediately available.
*/
public abstract boolean needContent();
public abstract void failContent(Throwable failure);
/**
* Produce a {@link HttpInput.Content} object with data currently stored within the channel. The produced content
* can be special (meaning calling {@link HttpInput.Content#isSpecial()} returns true) if the channel reached a special
* state, like EOF or an error.
* Once a special content has been returned, all subsequent calls to this method will always return a special content
* of the same kind and {@link #needContent()} will always return true.
* The returned content is "raw", i.e.: not decoded.
* @return a {@link HttpInput.Content} object if one is immediately available without blocking, null otherwise.
*/
public abstract HttpInput.Content produceContent();
/**
* Fail all content that is currently stored within the channel.
* @param failure the failure to fail the content with.
* @return true if EOF was reached while failing all content, false otherwise.
*/
public abstract boolean failAllContent(Throwable failure);
/**
* Fail the channel's input.
* @param failure the failure.
* @return true if the channel needs to be rescheduled.
*/
public abstract boolean failed(Throwable failure);
/**
* Mark the channel's input as EOF.
* @return true if the channel needs to be rescheduled.
*/
protected abstract boolean eof();
protected HttpOutput newHttpOutput()
{
@ -307,19 +345,6 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
_transientListeners.clear();
}
public void onAsyncWaitForContent()
{
}
public void onBlockWaitForContent()
{
}
public void onBlockWaitForContentFailure(Throwable failure)
{
getRequest().getHttpInput().failed(failure);
}
@Override
public void run()
{
@ -449,18 +474,6 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
throw _state.getAsyncContextEvent().getThrowable();
}
case READ_REGISTER:
{
onAsyncWaitForContent();
break;
}
case READ_PRODUCE:
{
_request.getHttpInput().asyncReadProduce();
break;
}
case READ_CALLBACK:
{
ContextHandler handler = _state.getContextHandler();
@ -705,12 +718,12 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
request.getFields());
}
public void onContent(HttpInput.Content content)
public boolean onContent(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("onContent {} {}", this, content);
_combinedListener.onRequestContent(_request, content.getByteBuffer());
_request.getHttpInput().addContent(content);
return false;
}
public boolean onContentComplete()
@ -733,7 +746,7 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
{
if (LOG.isDebugEnabled())
LOG.debug("onRequestComplete {}", this);
boolean result = _request.getHttpInput().eof();
boolean result = eof();
_combinedListener.onRequestEnd(_request);
return result;
}
@ -769,11 +782,6 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
_transport.onCompleted();
}
public boolean onEarlyEOF()
{
return _request.getHttpInput().earlyEOF();
}
public void onBadMessage(BadMessageException failure)
{
int status = failure.getCode();
@ -950,7 +958,7 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
return null;
}
public void execute(Runnable task)
protected void execute(Runnable task)
{
_executor.execute(task);
}

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,6 +51,7 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHttp.class);
private static final HttpField PREAMBLE_UPGRADE_H2C = new HttpField(HttpHeader.UPGRADE, "h2c");
private static final HttpInput.Content EOF = new HttpInput.EofContent();
private final HttpConnection _httpConnection;
private final RequestBuilder _requestBuilder = new RequestBuilder();
private MetaData.Request _metadata;
@ -61,6 +63,14 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
private boolean _expect102Processing = false;
private List<String> _complianceViolations;
private HttpFields.Mutable _trailers;
// Field _content doesn't need to be volatile nor protected by a lock
// as it is always accessed by the same thread, i.e.: we get notified by onFillable
// that the socket contains new bytes and either schedule an onDataAvailable
// call that is going to read the socket or release the blocking semaphore to wake up
// the blocked reader and make it read the socket. The same logic is true for async
// events like timeout: we get notified and either schedule onError or release the
// blocking semaphore.
private HttpInput.Content _content;
public HttpChannelOverHttp(HttpConnection httpConnection, Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport)
{
@ -76,15 +86,76 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
}
@Override
public void produceContent()
public boolean needContent()
{
((HttpConnection)getEndPoint().getConnection()).parseAndFillForContent();
if (_content != null)
{
if (LOG.isDebugEnabled())
LOG.debug("needContent has content immediately available: {}", _content);
return true;
}
_httpConnection.parseAndFillForContent();
if (_content != null)
{
if (LOG.isDebugEnabled())
LOG.debug("needContent has content after parseAndFillForContent: {}", _content);
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("needContent has no content");
_httpConnection.asyncReadFillInterested();
return false;
}
@Override
public void failContent(Throwable failure)
public HttpInput.Content produceContent()
{
((HttpConnection)getEndPoint().getConnection()).failContent(failure);
if (_content == null)
{
if (LOG.isDebugEnabled())
LOG.debug("produceContent has no content, parsing and filling");
_httpConnection.parseAndFillForContent();
}
HttpInput.Content result = _content;
if (result != null && !result.isSpecial())
_content = result.isEof() ? EOF : null;
if (LOG.isDebugEnabled())
LOG.debug("produceContent produced {}", result);
return result;
}
@Override
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {}", (Object)failure);
if (_content != null && !_content.isSpecial())
{
_content.failed(failure);
_content = _content.isEof() ? EOF : null;
if (_content == EOF)
return true;
}
while (true)
{
HttpInput.Content c = produceContent();
if (c == null)
{
if (LOG.isDebugEnabled())
LOG.debug("failed all content, EOF was not reached");
return false;
}
c.skip(c.remaining());
c.failed(failure);
if (c.isSpecial())
{
boolean atEof = c.isEof();
if (LOG.isDebugEnabled())
LOG.debug("failed all content, EOF = {}", atEof);
return atEof;
}
}
}
@Override
@ -97,7 +168,7 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
if (_metadata == null)
_metadata = _requestBuilder.build();
onRequest(_metadata);
getRequest().getHttpInput().earlyEOF();
markEarlyEOF();
}
catch (Exception e)
{
@ -108,10 +179,22 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
}
@Override
public boolean content(ByteBuffer content)
public boolean content(ByteBuffer buffer)
{
onContent(_httpConnection.newContent(content));
_delayedForContent = false;
HttpInput.Content content = _httpConnection.newContent(buffer);
if (_content != null)
{
if (_content.isSpecial())
content.failed(_content.getError());
else
throw new AssertionError("Cannot overwrite exiting content " + _content + " with " + content);
}
else
{
_content = content;
onContent(_content);
_delayedForContent = false;
}
return true;
}
@ -158,12 +241,69 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
_httpConnection.getGenerator().setPersistent(false);
// If we have no request yet, just close
if (_metadata == null)
_httpConnection.close();
else if (onEarlyEOF() || _delayedForContent)
{
_delayedForContent = false;
handle();
_httpConnection.close();
}
else
{
markEarlyEOF();
if (_delayedForContent)
{
_delayedForContent = false;
handle();
}
}
}
private void markEarlyEOF()
{
if (LOG.isDebugEnabled())
LOG.debug("received early EOF, content = {}", _content);
EofException failure = new EofException("Early EOF");
if (_content != null)
_content.failed(failure);
_content = new HttpInput.ErrorContent(failure);
}
@Override
protected boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF, content = {}", _content);
if (_content == null)
{
_content = EOF;
}
else
{
HttpInput.Content c = _content;
_content = new HttpInput.WrappingContent(c, true);
}
return false;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed {}, content = {}", x, _content);
Throwable error = null;
if (_content != null && _content.isSpecial())
error = _content.getError();
if (error != null && error != x)
{
error.addSuppressed(x);
}
else
{
if (_content != null)
_content.failed(x);
_content = new HttpInput.ErrorContent(x);
}
return getRequest().getHttpInput().onContentProducible();
}
@Override
@ -320,24 +460,6 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
return onRequestComplete();
}
@Override
public void onAsyncWaitForContent()
{
_httpConnection.asyncReadFillInterested();
}
@Override
public void onBlockWaitForContent()
{
_httpConnection.blockingReadFillInterested();
}
@Override
public void onBlockWaitForContentFailure(Throwable failure)
{
_httpConnection.blockingReadFailure(failure);
}
@Override
public void onComplianceViolation(ComplianceViolation.Mode mode, ComplianceViolation violation, String details)
{
@ -445,6 +567,9 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
_upgrade = null;
_trailers = null;
_metadata = null;
if (_content != null && !_content.isSpecial())
throw new AssertionError("unconsumed content: " + _content);
_content = null;
}
@Override
@ -539,13 +664,24 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
if (_delayedForContent)
{
_delayedForContent = false;
getRequest().getHttpInput().onIdleTimeout(timeout);
doOnIdleTimeout(timeout);
execute(this);
return false;
}
return true;
}
private void doOnIdleTimeout(Throwable x)
{
boolean neverDispatched = getState().isIdle();
boolean waitingForContent = _content == null || _content.remaining() == 0;
if ((waitingForContent || neverDispatched) && (_content == null || !_content.isSpecial()))
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
_content = new HttpInput.ErrorContent(x);
}
}
private static class RequestBuilder
{
private final HttpFields.Mutable _fieldsBuilder = HttpFields.build();

View File

@ -107,12 +107,9 @@ public class HttpChannelState
*/
private enum InputState
{
IDLE, // No isReady; No data
REGISTER, // isReady()==false handling; No data
REGISTERED, // isReady()==false !handling; No data
POSSIBLE, // isReady()==false async read callback called (http/1 only)
PRODUCING, // isReady()==false READ_PRODUCE action is being handled (http/1 only)
READY // isReady() was false, onContentAdded has been called
IDLE, // No isReady; No data
UNREADY, // isReady()==false; No data
READY // isReady() was false; data is available
}
/*
@ -137,8 +134,6 @@ public class HttpChannelState
ASYNC_ERROR, // handle an async error
ASYNC_TIMEOUT, // call asyncContext onTimeout
WRITE_CALLBACK, // handle an IO write callback
READ_REGISTER, // Register for fill interest
READ_PRODUCE, // Check is a read is possible by parsing/filling
READ_CALLBACK, // handle an IO read callback
COMPLETE, // Complete the response by closing output
TERMINATED, // No further actions
@ -465,19 +460,12 @@ public class HttpChannelState
case ASYNC:
switch (_inputState)
{
case POSSIBLE:
_inputState = InputState.PRODUCING;
return Action.READ_PRODUCE;
case IDLE:
case UNREADY:
break;
case READY:
_inputState = InputState.IDLE;
return Action.READ_CALLBACK;
case REGISTER:
case PRODUCING:
_inputState = InputState.REGISTERED;
return Action.READ_REGISTER;
case IDLE:
case REGISTERED:
break;
default:
throw new IllegalStateException(getStatusStringLocked());
@ -1222,99 +1210,8 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name, attribute);
}
/**
* Called to signal async read isReady() has returned false.
* This indicates that there is no content available to be consumed
* and that once the channel enters the ASYNC_WAIT state it will
* register for read interest by calling {@link HttpChannel#onAsyncWaitForContent()}
* either from this method or from a subsequent call to {@link #unhandle()}.
*/
public void onReadUnready()
{
boolean interested = false;
try (AutoLock l = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("onReadUnready {}", toStringLocked());
switch (_inputState)
{
case IDLE:
case READY:
if (_state == State.WAITING)
{
interested = true;
_inputState = InputState.REGISTERED;
}
else
{
_inputState = InputState.REGISTER;
}
break;
case REGISTER:
case REGISTERED:
case POSSIBLE:
case PRODUCING:
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
if (interested)
_channel.onAsyncWaitForContent();
}
/**
* Called to signal that content is now available to read.
* If the channel is in ASYNC_WAIT state and unready (ie isReady() has
* returned false), then the state is changed to ASYNC_WOKEN and true
* is returned.
*
* @return True IFF the channel was unready and in ASYNC_WAIT state
*/
public boolean onContentAdded()
{
boolean woken = false;
try (AutoLock l = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("onContentAdded {}", toStringLocked());
switch (_inputState)
{
case IDLE:
case READY:
break;
case PRODUCING:
_inputState = InputState.READY;
break;
case REGISTER:
case REGISTERED:
_inputState = InputState.READY;
if (_state == State.WAITING)
{
woken = true;
_state = State.WOKEN;
}
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
return woken;
}
/**
* Called to signal that the channel is ready for a callback.
* This is similar to calling {@link #onReadUnready()} followed by
* {@link #onContentAdded()}, except that as content is already
* available, read interest is never set.
*
* @return true if woken
*/
@ -1328,7 +1225,11 @@ public class HttpChannelState
switch (_inputState)
{
case READY:
_inputState = InputState.READY;
break;
case IDLE:
case UNREADY:
_inputState = InputState.READY;
if (_state == State.WAITING)
{
@ -1344,25 +1245,20 @@ public class HttpChannelState
return woken;
}
/**
* Called to indicate that more content may be available,
* but that a handling thread may need to produce (fill/parse)
* it. Typically called by the async read success callback.
*
* @return {@code true} if more content may be available
*/
public boolean onReadPossible()
public boolean onReadEof()
{
boolean woken = false;
try (AutoLock l = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("onReadPossible {}", toStringLocked());
LOG.debug("onReadEof {}", toStringLocked());
switch (_inputState)
{
case REGISTERED:
_inputState = InputState.POSSIBLE;
case IDLE:
case READY:
case UNREADY:
_inputState = InputState.READY;
if (_state == State.WAITING)
{
woken = true;
@ -1370,11 +1266,6 @@ public class HttpChannelState
}
break;
case IDLE:
case READY:
case REGISTER:
break;
default:
throw new IllegalStateException(toStringLocked());
}
@ -1382,29 +1273,72 @@ public class HttpChannelState
return woken;
}
/**
* Called to signal that a read has read -1.
* Will wake if the read was called while in ASYNC_WAIT state
*
* @return {@code true} if woken
*/
public boolean onReadEof()
public void onContentAdded()
{
boolean woken = false;
try (AutoLock l = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("onEof {}", toStringLocked());
LOG.debug("onContentAdded {}", toStringLocked());
// Force read ready so onAllDataRead can be called
_inputState = InputState.READY;
if (_state == State.WAITING)
switch (_inputState)
{
woken = true;
_state = State.WOKEN;
case IDLE:
case UNREADY:
case READY:
_inputState = InputState.READY;
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
public void onReadIdle()
{
try (AutoLock l = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("onReadIdle {}", toStringLocked());
switch (_inputState)
{
case UNREADY:
case READY:
case IDLE:
_inputState = InputState.IDLE;
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
/**
* Called to indicate that more content may be available,
* but that a handling thread may need to produce (fill/parse)
* it. Typically called by the async read success callback.
*/
public void onReadUnready()
{
try (AutoLock l = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("onReadUnready {}", toStringLocked());
switch (_inputState)
{
case IDLE:
case UNREADY:
case READY: // READY->UNREADY is needed by AsyncServletIOTest.testStolenAsyncRead
_inputState = InputState.UNREADY;
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
return woken;
}
public boolean onWritePossible()

View File

@ -0,0 +1,84 @@
@startuml
title HttpChannelState
note top of onReadReady_inputState: onReadReady
state "input state" as onReadReady_inputState {
state "IDLE" as onReadReady_IDLE
state "UNREADY" as onReadReady_UNREADY
state "READY" as onReadReady_READY
state "channel state" as onReadReady_channelState {
state "WAITING" as onReadReady_WAITING
state "WOKEN" as onReadReady_WOKEN
onReadReady_WAITING --> onReadReady_WOKEN
}
onReadReady_IDLE --> onReadReady_channelState
onReadReady_UNREADY --> onReadReady_channelState
onReadReady_channelState --> onReadReady_READY
onReadReady_READY --> onReadReady_READY
}
note top of onReadEof_inputState: onReadEof
state "input state" as onReadEof_inputState {
state "IDLE" as onReadEof_IDLE
state "UNREADY" as onReadEof_UNREADY
state "READY" as onReadEof_READY
state "channel state" as onReadEof_channelState {
state "WAITING" as onReadEof_WAITING
state "WOKEN" as onReadEof_WOKEN
onReadEof_WAITING --> onReadEof_WOKEN
}
onReadEof_IDLE --> onReadEof_channelState
onReadEof_UNREADY --> onReadEof_channelState
onReadEof_READY --> onReadEof_channelState
onReadEof_channelState --> onReadEof_READY
}
note top of onReadIdle_inputState: onReadIdle
state "input state" as onReadIdle_inputState {
state "IDLE" as onReadIdle_IDLE
state "UNREADY" as onReadIdle_UNREADY
state "READY" as onReadIdle_READY
onReadIdle_IDLE --> onReadIdle_IDLE
onReadIdle_UNREADY --> onReadIdle_IDLE
onReadIdle_READY --> onReadIdle_IDLE
}
note top of onReadUnready_inputState: onReadUnready
state "input state" as onReadUnready_inputState {
state "IDLE" as onReadUnready_IDLE
state "UNREADY" as onReadUnready_UNREADY
state "READY" as onReadUnready_READY
onReadUnready_IDLE --> onReadUnready_UNREADY
onReadUnready_UNREADY --> onReadUnready_UNREADY
onReadUnready_READY --> onReadUnready_UNREADY
}
note top of onContentAdded_inputState: onContentAdded
state "input state" as onContentAdded_inputState {
state "IDLE" as onContentAdded_IDLE
state "UNREADY" as onContentAdded_UNREADY
state "READY" as onContentAdded_READY
onContentAdded_IDLE --> onContentAdded_READY
onContentAdded_UNREADY --> onContentAdded_READY
onContentAdded_READY --> onContentAdded_READY
}
@enduml

View File

@ -33,8 +33,6 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.AbstractConnection;
@ -69,7 +67,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final HttpParser _parser;
private final AtomicInteger _contentBufferReferences = new AtomicInteger();
private volatile ByteBuffer _requestBuffer = null;
private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
private final boolean _recordHttpComplianceViolations;
@ -321,27 +318,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
*/
void parseAndFillForContent()
{
// parseRequestBuffer() must always be called after fillRequestBuffer() otherwise this method doesn't trigger EOF/earlyEOF
// which breaks AsyncRequestReadTest.testPartialReadThenShutdown()
// When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown()
int filled = Integer.MAX_VALUE;
while (_parser.inContentState())
{
boolean handled = parseRequestBuffer();
if (handled || filled <= 0 || _input.hasContent())
break;
filled = fillRequestBuffer();
}
}
void failContent(Throwable failure)
{
int filled = Integer.MAX_VALUE;
while (_parser.inContentState())
{
// The parser is going generate and forward contents to the HttpInput
// so it's up to it to fail them individually.
parseRequestBuffer();
if (filled <= 0 || _input.hasContent())
if (handled || filled <= 0)
break;
filled = fillRequestBuffer();
}
@ -614,25 +597,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void asyncReadFillInterested()
{
getEndPoint().fillInterested(_asyncReadCallback);
}
public void blockingReadFillInterested()
{
// We try fillInterested here because of SSL and
// spurious wakeups. With blocking reads, we read in a loop
// that tries to read/parse content and blocks waiting if there is
// none available. The loop can be woken up by incoming encrypted
// bytes, which due to SSL might not produce any decrypted bytes.
// Thus the loop needs to register fill interest again. However if
// the loop is woken up spuriously, then the register interest again
// can result in a pending read exception, unless we use tryFillInterested.
getEndPoint().tryFillInterested(_blockingReadCallback);
}
public void blockingReadFailure(Throwable e)
{
_blockingReadCallback.failed(e);
getEndPoint().tryFillInterested(_asyncReadCallback);
}
@Override
@ -687,44 +652,30 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
private class BlockingReadCallback implements Callback
{
@Override
public void succeeded()
{
_input.unblock();
}
@Override
public void failed(Throwable x)
{
_input.failed(x);
}
@Override
public InvocationType getInvocationType()
{
// This callback does not block, rather it wakes up the
// thread that is blocked waiting on the read.
return InvocationType.NON_BLOCKING;
}
}
private class AsyncReadCallback implements Callback
{
@Override
public void succeeded()
{
if (_channel.getState().onReadPossible())
if (_channel.getRequest().getHttpInput().onContentProducible())
_channel.handle();
}
@Override
public void failed(Throwable x)
{
if (_input.failed(x))
if (_channel.failed(x))
_channel.handle();
}
@Override
public InvocationType getInvocationType()
{
// This callback does not block when the HttpInput is in blocking mode,
// rather it wakes up the thread that is blocked waiting on the read;
// but it can if it is in async mode, hence the varying InvocationType.
return _channel.getRequest().getHttpInput().isAsync() ? InvocationType.BLOCKING : InvocationType.NON_BLOCKING;
}
}
private class SendCallback extends IteratingCallback

View File

@ -0,0 +1,16 @@
@startuml
IDLE:
READY:
UNREADY:
[*] --> IDLE
IDLE --> UNREADY : isReady
IDLE -right->READY : isReady
UNREADY -up-> READY : ASYNC onContentProducible
READY -left->IDLE : nextContent
@enduml

View File

@ -0,0 +1,114 @@
@startuml
title "HttpInput"
participant AsyncContentDelivery as "[async\ncontent\ndelivery]"
participant HttpChannel as "Http\nChannel\n"
participant HttpChannelState as "Http\nChannel\nState"
participant HttpInputInterceptor as "Http\nInput.\nInterceptor"
participant AsyncContentProducer as "Async\nContent\nProducer"
participant HttpInput as "Http\nInput\n"
participant Application as "\nApplication\n"
autoactivate on
== Async Read ==
Application->HttpInput: read
activate Application
HttpInput->AsyncContentProducer: nextContent
AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent
AsyncContentProducer->HttpChannel: produceContent
return raw content or null
alt if raw content is not null
AsyncContentProducer->HttpInputInterceptor: readFrom
return transformed content
end
return
alt if transformed content is not null
AsyncContentProducer->HttpChannelState: onReadIdle
return
end
return content or null
note over HttpInput
throw ISE
if content
is null
end note
HttpInput->AsyncContentProducer: reclaim
return
return
deactivate Application
== isReady ==
Application->HttpInput: isReady
activate Application
HttpInput->AsyncContentProducer: isReady
AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent
AsyncContentProducer->HttpChannel: produceContent
return raw content or null
alt if raw content is not null
AsyncContentProducer->HttpInputInterceptor: readFrom
return transformed content
end
return
alt if transformed content is not null
AsyncContentProducer->HttpChannelState: onContentAdded
return
else transformed content is null
AsyncContentProducer->HttpChannelState: onReadUnready
return
AsyncContentProducer->HttpChannel: needContent
return
alt if needContent returns true
AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent
return
alt if transformed content is not null
AsyncContentProducer->HttpChannelState: onContentAdded
return
end
end
end
return boolean\n[transformed\ncontent is not null]
return
deactivate Application
alt if content arrives
AsyncContentDelivery->HttpInput: onContentProducible
HttpInput->AsyncContentProducer: onContentProducible
alt if not at EOF
AsyncContentProducer->HttpChannelState: onReadReady
return true if woken
else if at EOF
AsyncContentProducer->HttpChannelState: onReadEof
return true if woken
end
return true if woken
return true if woken
alt onContentProducible returns true
AsyncContentDelivery->HttpChannel: execute(HttpChannel)
return
end
end
|||
== available ==
Application->HttpInput: available
activate Application
HttpInput->AsyncContentProducer: available
AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent
AsyncContentProducer->HttpChannel: produceContent
return raw content or null
alt if raw content is not null
AsyncContentProducer->HttpInputInterceptor: readFrom
return transformed content
end
return
return content size or\n0 if content is null
return
deactivate Application
|||
@enduml

View File

@ -0,0 +1,64 @@
@startuml
title "HttpInput"
participant AsyncContentDelivery as "[async\ncontent\ndelivery]"
participant HttpChannel as "Http\nChannel\n"
participant HttpChannelState as "Http\nChannel\nState"
participant AsyncContentProducer as "Async\nContent\nProducer"
participant Semaphore as "\nSemaphore\n"
participant BlockingContentProducer as "Blocking\nContent\nProducer"
participant HttpInput as "Http\nInput\n"
participant Application as "\nApplication\n"
autoactivate on
== Blocking Read ==
Application->HttpInput: read
activate Application
HttpInput->BlockingContentProducer: nextContent
loop
BlockingContentProducer->AsyncContentProducer: nextContent
AsyncContentProducer->AsyncContentProducer: nextTransformedContent
AsyncContentProducer->HttpChannel: produceContent
return
return
alt content is not null
AsyncContentProducer->HttpChannelState: onReadIdle
return
end
return content or null
alt content is null
BlockingContentProducer->HttpChannelState: onReadUnready
return
BlockingContentProducer->HttpChannel: needContent
return
alt needContent returns false
BlockingContentProducer->Semaphore: acquire
return
else needContent returns true
note over BlockingContentProducer
continue loop
end note
end
else content is not null
return non-null content
end
end
' return from BlockingContentProducer: nextContent
HttpInput->BlockingContentProducer: reclaim
BlockingContentProducer->AsyncContentProducer: reclaim
return
return
return
deactivate Application
alt if content arrives
AsyncContentDelivery->HttpInput: wakeup
HttpInput->BlockingContentProducer: wakeup
BlockingContentProducer->Semaphore: release
return
return false
return false
end
@enduml

View File

@ -749,7 +749,7 @@ public class Request implements HttpServletRequest
public long getContentRead()
{
return _input.getContentLength();
return _input.getContentReceived();
}
@Override

View File

@ -0,0 +1,340 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor;
import org.eclipse.jetty.util.compression.CompressionPool;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.hamcrest.core.Is;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class AsyncContentProducerTest
{
private ScheduledExecutorService scheduledExecutorService;
private InflaterPool inflaterPool;
@BeforeEach
public void setUp()
{
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
inflaterPool = new InflaterPool(-1, true);
}
@AfterEach
public void tearDown()
{
scheduledExecutorService.shutdownNow();
}
@Test
public void testAsyncContentProducerNoInterceptor() throws Exception
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);
CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, nullValue());
}
@Test
public void testAsyncContentProducerNoInterceptorWithError() throws Exception
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);
final Throwable expectedError = new EofException("Early EOF");
CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, Is.is(expectedError));
}
@Test
public void testAsyncContentProducerGzipInterceptor() throws Exception
{
ByteBuffer[] uncompressedBuffers = new ByteBuffer[3];
uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(uncompressedBuffers);
final String originalContentString = asString(uncompressedBuffers);
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = gzipByteBuffer(uncompressedBuffers[0]);
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, nullValue());
}
@Test
public void testAsyncContentProducerGzipInterceptorWithTinyBuffers() throws Exception
{
ByteBuffer[] uncompressedBuffers = new ByteBuffer[3];
uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(uncompressedBuffers);
final String originalContentString = asString(uncompressedBuffers);
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = gzipByteBuffer(uncompressedBuffers[0]);
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier));
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier);
assertThat(error, nullValue());
}
@Test
public void testBlockingContentProducerGzipInterceptorWithError() throws Exception
{
ByteBuffer[] uncompressedBuffers = new ByteBuffer[3];
uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(uncompressedBuffers);
final String originalContentString = asString(uncompressedBuffers);
final Throwable expectedError = new Throwable("HttpInput idle timeout");
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = gzipByteBuffer(uncompressedBuffers[0]);
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
CyclicBarrier barrier = new CyclicBarrier(2);
ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier));
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier);
assertThat(error, Is.is(expectedError));
}
private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, ContentProducer contentProducer, int totalContentCount, int readyCount, int notReadyCount, CyclicBarrier barrier) throws InterruptedException, BrokenBarrierException, TimeoutException
{
int readBytes = 0;
String consumedString = "";
int nextContentCount = 0;
int isReadyFalseCount = 0;
int isReadyTrueCount = 0;
Throwable error = null;
while (true)
{
if (contentProducer.isReady())
isReadyTrueCount++;
else
isReadyFalseCount++;
HttpInput.Content content = contentProducer.nextContent();
nextContentCount++;
if (content == null)
{
barrier.await(5, TimeUnit.SECONDS);
content = contentProducer.nextContent();
nextContentCount++;
}
assertThat(content, notNullValue());
if (content.isSpecial())
{
if (content.isEof())
break;
error = content.getError();
break;
}
byte[] b = new byte[content.remaining()];
readBytes += b.length;
content.getByteBuffer().get(b);
consumedString += new String(b, StandardCharsets.ISO_8859_1);
content.skip(content.remaining());
}
assertThat(nextContentCount, is(totalContentCount));
assertThat(readBytes, is(totalContentBytesCount));
assertThat(consumedString, is(originalContentString));
assertThat(isReadyFalseCount, is(notReadyCount));
assertThat(isReadyTrueCount, is(readyCount));
return error;
}
private static int countRemaining(ByteBuffer[] byteBuffers)
{
int total = 0;
for (ByteBuffer byteBuffer : byteBuffers)
{
total += byteBuffer.remaining();
}
return total;
}
private static String asString(ByteBuffer[] buffers)
{
StringBuilder sb = new StringBuilder();
for (ByteBuffer buffer : buffers)
{
byte[] b = new byte[buffer.remaining()];
buffer.duplicate().get(b);
sb.append(new String(b, StandardCharsets.ISO_8859_1));
}
return sb.toString();
}
private static ByteBuffer gzipByteBuffer(ByteBuffer uncompressedBuffer)
{
try
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
byte[] b = new byte[uncompressedBuffer.remaining()];
uncompressedBuffer.get(b);
output.write(b);
output.close();
return ByteBuffer.wrap(baos.toByteArray());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
private static class ArrayDelayedHttpChannel extends HttpChannel
{
private final ByteBuffer[] byteBuffers;
private final HttpInput.Content finalContent;
private final ScheduledExecutorService scheduledExecutorService;
private final CyclicBarrier barrier;
private int counter;
private volatile HttpInput.Content nextContent;
public ArrayDelayedHttpChannel(ByteBuffer[] byteBuffers, HttpInput.Content finalContent, ScheduledExecutorService scheduledExecutorService, CyclicBarrier barrier)
{
super(new MockConnector(), new HttpConfiguration(), null, null);
this.byteBuffers = new ByteBuffer[byteBuffers.length];
this.finalContent = finalContent;
this.scheduledExecutorService = scheduledExecutorService;
this.barrier = barrier;
for (int i = 0; i < byteBuffers.length; i++)
{
this.byteBuffers[i] = byteBuffers[i].duplicate();
}
}
@Override
public boolean needContent()
{
if (nextContent != null)
return true;
scheduledExecutorService.schedule(() ->
{
if (byteBuffers.length > counter)
nextContent = new HttpInput.Content(byteBuffers[counter++]);
else
nextContent = finalContent;
try
{
barrier.await(5, TimeUnit.SECONDS);
}
catch (Exception e)
{
throw new AssertionError(e);
}
}, 50, TimeUnit.MILLISECONDS);
return false;
}
@Override
public HttpInput.Content produceContent()
{
HttpInput.Content result = nextContent;
nextContent = null;
return result;
}
@Override
public boolean failAllContent(Throwable failure)
{
nextContent = null;
counter = byteBuffers.length;
return false;
}
@Override
public boolean failed(Throwable x)
{
return false;
}
@Override
protected boolean eof()
{
return false;
}
}
}

View File

@ -0,0 +1,320 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
public class BlockingContentProducerTest
{
private ScheduledExecutorService scheduledExecutorService;
private InflaterPool inflaterPool;
@BeforeEach
public void setUp()
{
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
inflaterPool = new InflaterPool(-1, true);
}
@AfterEach
public void tearDown()
{
scheduledExecutorService.shutdownNow();
}
@Test
public void testBlockingContentProducerNoInterceptor()
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);
AtomicReference<ContentProducer> ref = new AtomicReference<>();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible());
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer);
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, nullValue());
}
@Test
public void testBlockingContentProducerNoInterceptorWithError()
{
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(buffers);
final String originalContentString = asString(buffers);
final Throwable expectedError = new EofException("Early EOF");
AtomicReference<ContentProducer> ref = new AtomicReference<>();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible());
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer);
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, is(expectedError));
}
@Test
public void testBlockingContentProducerGzipInterceptor()
{
ByteBuffer[] uncompressedBuffers = new ByteBuffer[3];
uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(uncompressedBuffers);
final String originalContentString = asString(uncompressedBuffers);
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = gzipByteBuffer(uncompressedBuffers[0]);
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
AtomicReference<ContentProducer> ref = new AtomicReference<>();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible());
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer);
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, nullValue());
}
@Test
public void testBlockingContentProducerGzipInterceptorWithTinyBuffers()
{
ByteBuffer[] uncompressedBuffers = new ByteBuffer[3];
uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(uncompressedBuffers);
final String originalContentString = asString(uncompressedBuffers);
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = gzipByteBuffer(uncompressedBuffers[0]);
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
AtomicReference<ContentProducer> ref = new AtomicReference<>();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible());
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer);
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer);
assertThat(error, nullValue());
}
@Test
public void testBlockingContentProducerGzipInterceptorWithError()
{
ByteBuffer[] uncompressedBuffers = new ByteBuffer[3];
uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1));
uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1));
final int totalContentBytesCount = countRemaining(uncompressedBuffers);
final String originalContentString = asString(uncompressedBuffers);
final Throwable expectedError = new Throwable("HttpInput idle timeout");
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = gzipByteBuffer(uncompressedBuffers[0]);
buffers[1] = gzipByteBuffer(uncompressedBuffers[1]);
buffers[2] = gzipByteBuffer(uncompressedBuffers[2]);
AtomicReference<ContentProducer> ref = new AtomicReference<>();
ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible());
ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel));
ref.set(contentProducer);
contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32));
Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer);
assertThat(error, is(expectedError));
}
private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, int totalContentCount, ContentProducer contentProducer)
{
int readBytes = 0;
int nextContentCount = 0;
String consumedString = "";
Throwable error = null;
while (true)
{
HttpInput.Content content = contentProducer.nextContent();
nextContentCount++;
if (content.isSpecial())
{
if (content.isEof())
break;
error = content.getError();
break;
}
byte[] b = new byte[content.remaining()];
content.getByteBuffer().get(b);
consumedString += new String(b, StandardCharsets.ISO_8859_1);
readBytes += b.length;
}
assertThat(readBytes, is(totalContentBytesCount));
assertThat(nextContentCount, is(totalContentCount));
assertThat(consumedString, is(originalContentString));
return error;
}
private static int countRemaining(ByteBuffer[] byteBuffers)
{
int total = 0;
for (ByteBuffer byteBuffer : byteBuffers)
{
total += byteBuffer.remaining();
}
return total;
}
private static String asString(ByteBuffer[] buffers)
{
StringBuilder sb = new StringBuilder();
for (ByteBuffer buffer : buffers)
{
byte[] b = new byte[buffer.remaining()];
buffer.duplicate().get(b);
sb.append(new String(b, StandardCharsets.ISO_8859_1));
}
return sb.toString();
}
private static ByteBuffer gzipByteBuffer(ByteBuffer uncompressedBuffer)
{
try
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
byte[] b = new byte[uncompressedBuffer.remaining()];
uncompressedBuffer.get(b);
output.write(b);
output.close();
return ByteBuffer.wrap(baos.toByteArray());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
private interface ContentListener
{
void onContent();
}
private static class ArrayDelayedHttpChannel extends HttpChannel
{
private final ByteBuffer[] byteBuffers;
private final HttpInput.Content finalContent;
private final ScheduledExecutorService scheduledExecutorService;
private final ContentListener contentListener;
private int counter;
private volatile HttpInput.Content nextContent;
public ArrayDelayedHttpChannel(ByteBuffer[] byteBuffers, HttpInput.Content finalContent, ScheduledExecutorService scheduledExecutorService, ContentListener contentListener)
{
super(new MockConnector(), new HttpConfiguration(), null, null);
this.byteBuffers = new ByteBuffer[byteBuffers.length];
this.finalContent = finalContent;
this.scheduledExecutorService = scheduledExecutorService;
this.contentListener = contentListener;
for (int i = 0; i < byteBuffers.length; i++)
{
this.byteBuffers[i] = byteBuffers[i].duplicate();
}
}
@Override
public boolean needContent()
{
if (nextContent != null)
return true;
scheduledExecutorService.schedule(() ->
{
if (byteBuffers.length > counter)
nextContent = new HttpInput.Content(byteBuffers[counter++]);
else
nextContent = finalContent;
contentListener.onContent();
}, 50, TimeUnit.MILLISECONDS);
return false;
}
@Override
public HttpInput.Content produceContent()
{
HttpInput.Content result = nextContent;
nextContent = null;
return result;
}
@Override
public boolean failAllContent(Throwable failure)
{
nextContent = null;
counter = byteBuffers.length;
return false;
}
@Override
public boolean failed(Throwable x)
{
return false;
}
@Override
protected boolean eof()
{
return false;
}
}
}

View File

@ -49,13 +49,21 @@ public class HttpWriterTest
HttpChannel channel = new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null)
{
@Override
public void produceContent()
public boolean needContent()
{
return false;
}
@Override
public void failContent(Throwable failure)
public HttpInput.Content produceContent()
{
return null;
}
@Override
public boolean failAllContent(Throwable failure)
{
return false;
}
@Override
@ -63,6 +71,18 @@ public class HttpWriterTest
{
return pool;
}
@Override
public boolean failed(Throwable x)
{
return false;
}
@Override
protected boolean eof()
{
return false;
}
};
_httpOut = new HttpOutput(channel)

View File

@ -178,13 +178,33 @@ public class ResponseTest
})
{
@Override
public void produceContent()
public boolean needContent()
{
return false;
}
@Override
public void failContent(Throwable failure)
public HttpInput.Content produceContent()
{
return null;
}
@Override
public boolean failAllContent(Throwable failure)
{
return false;
}
@Override
public boolean failed(Throwable x)
{
return false;
}
@Override
protected boolean eof()
{
return false;
}
};
}

View File

@ -36,6 +36,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
@ -75,6 +76,8 @@ import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.compression.CompressionPool;
import org.eclipse.jetty.util.compression.InflaterPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
@ -89,6 +92,7 @@ import static org.eclipse.jetty.http.client.Transport.HTTP;
import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -778,10 +782,18 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
throw new IllegalStateException();
if (input.read() != 'X')
throw new IllegalStateException();
if (!input.isReady())
throw new IllegalStateException();
if (input.read() != -1)
throw new IllegalStateException();
if (input.isReady())
{
try
{
if (input.read() != -1)
throw new IllegalStateException();
}
catch (IOException e)
{
// ignore
}
}
}
catch (IOException x)
{
@ -1346,6 +1358,81 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testAsyncEcho(Transport transport) throws Exception
{
init(transport);
scenario.start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
System.err.println("Service " + request);
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady())
{
int b = input.read();
if (b >= 0)
{
// System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b));
response.getOutputStream().write(b);
}
else
return;
}
}
@Override
public void onAllDataRead() throws IOException
{
asyncContext.complete();
}
@Override
public void onError(Throwable x)
{
}
});
}
});
AsyncRequestContent contentProvider = new AsyncRequestContent();
CountDownLatch clientLatch = new CountDownLatch(1);
AtomicReference<Result> resultRef = new AtomicReference<>();
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.path(scenario.servletPath)
.body(contentProvider)
.send(new BufferingResponseListener(16 * 1024 * 1024)
{
@Override
public void onComplete(Result result)
{
resultRef.set(result);
clientLatch.countDown();
}
});
for (int i = 0; i < 1_000_000; i++)
{
contentProvider.offer(BufferUtil.toBuffer("S" + i));
}
contentProvider.close();
assertTrue(clientLatch.await(30, TimeUnit.SECONDS));
assertThat(resultRef.get().isSucceeded(), Matchers.is(true));
assertThat(resultRef.get().getResponse().getStatus(), Matchers.equalTo(HttpStatus.OK_200));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testAsyncInterceptedTwice(Transport transport) throws Exception
@ -1359,7 +1446,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
System.err.println("Service " + request);
final HttpInput httpInput = ((Request)request).getHttpInput();
httpInput.addInterceptor(new GzipHttpInputInterceptor(((Request)request).getHttpChannel().getByteBufferPool(), 1024));
httpInput.addInterceptor(new GzipHttpInputInterceptor(new InflaterPool(-1, true), ((Request)request).getHttpChannel().getByteBufferPool(), 1024));
httpInput.addInterceptor(content ->
{
ByteBuffer byteBuffer = content.getByteBuffer();
@ -1406,7 +1493,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
AsyncRequestContent contentProvider = new AsyncRequestContent();
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
@ -1421,7 +1508,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.path(scenario.servletPath)
.content(contentProvider)
.body(contentProvider)
.send(new BufferingResponseListener()
{
@Override
@ -1437,19 +1524,11 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
});
contentProvider.offer(gzipToBuffer("S0"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S1"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S2"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S3"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S4"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S5"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S6"));
for (int i = 0; i < 7; i++)
{
contentProvider.offer(gzipToBuffer("S" + i));
contentProvider.flush();
}
contentProvider.close();
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
@ -1534,7 +1613,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
AsyncRequestContent contentProvider = new AsyncRequestContent();
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
@ -1546,7 +1625,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.path(scenario.servletPath)
.content(contentProvider)
.body(contentProvider)
.send(new BufferingResponseListener()
{
@Override
@ -1631,18 +1710,21 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}))
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
latch.countDown();
}
catch (Throwable x)
{
failures.offer(x);
}
finally
{
latch.countDown();
}
}
});
}
assertTrue(latch.await(30, TimeUnit.SECONDS));
assertTrue(failures.isEmpty());
assertThat(failures, empty());
}
private static class Listener implements ReadListener, WriteListener
@ -1771,10 +1853,11 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
@Override
public void stopServer()
public void stopServer() throws Exception
{
checkScope();
scope.set(null);
super.stopServer();
}
}
}