diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java index 0c7b168604b..38a46350aae 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java @@ -33,7 +33,10 @@ import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpInput; +import org.eclipse.jetty.server.HttpInputOverFCGI; import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; @@ -57,6 +60,12 @@ public class HttpChannelOverFCGI extends HttpChannel this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this); } + @Override + protected HttpInput newHttpInput(HttpChannelState state) + { + return new HttpInputOverFCGI(state); + } + protected void header(HttpField field) { String name = field.getName(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index cf8e24d5516..12ff69c7ed0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -121,7 +121,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor protected HttpInput newHttpInput(HttpChannelState state) { - return new HttpInput(state); + //TODO the HTTP2 impl instantiation should be in a subclass + return new HttpInputOverHTTP2(state); } protected HttpOutput newHttpOutput() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 9b8e1061607..e5a2b1a7697 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -34,6 +34,7 @@ import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser.RequestHandler; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.io.AbstractConnection; @@ -316,21 +317,20 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } /** - * Fill and parse data looking for content - * - * @return true if an {@link RequestHandler} method was called and it returned true; + * Parse and fill data, looking for content */ - protected boolean fillAndParseForContent() + protected void parseAndFillForContent() { - boolean handled = false; + // parseRequestBuffer() must always be called after fillRequestBuffer() otherwise this method doesn't trigger EOF/earlyEOF + // which breaks AsyncRequestReadTest.testPartialReadThenShutdown() + int filled = Integer.MAX_VALUE; while (_parser.inContentState()) { - int filled = fillRequestBuffer(); - handled = parseRequestBuffer(); + boolean handled = parseRequestBuffer(); if (handled || filled <= 0 || _input.hasContent()) break; + filled = fillRequestBuffer(); } - return handled; } private int fillRequestBuffer() @@ -655,8 +655,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public void succeeded() { - if (_contentBufferReferences.decrementAndGet() == 0) + int counter = _contentBufferReferences.decrementAndGet(); + if (counter == 0) releaseRequestBuffer(); + // TODO: this should do something (warn? fail?) if _contentBufferReferences goes below 0 + if (counter < 0) + { + LOG.warn("Content reference counting went below zero: {}", counter); + _contentBufferReferences.incrementAndGet(); + } } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 02c0be87ba4..ac3bd8444ab 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -18,542 +18,50 @@ package org.eclipse.jetty.server; -import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.Objects; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; -import org.eclipse.jetty.http.BadMessageException; -import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.component.Destroyable; -import org.eclipse.jetty.util.thread.AutoLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}. - *

- * Content may arrive in patterns such as [content(), content(), messageComplete()] - * so that this class maintains two states: the content state that tells - * whether there is content to consume and the EOF state that tells whether an EOF has arrived. - * Only once the content has been consumed the content state is moved to the EOF state. - *

+ *

This would be an interface if ServletInputStream was an interface too.

+ *

While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link + * ContextHandler#handle(Runnable)} to setup classloaders etc.

*/ -public class HttpInput extends ServletInputStream implements Runnable +public abstract class HttpInput extends ServletInputStream implements Runnable { - /** - * An interceptor for HTTP Request input. - *

- * Unlike InputStream wrappers that can be applied by filters, an interceptor - * is entirely transparent and works with async IO APIs. - *

- *

- * An Interceptor may consume data from the passed content and the interceptor - * will continue to be called for the same content until the interceptor returns - * null or an empty content. Thus even if the passed content is completely consumed - * the interceptor will be called with the same content until it can no longer - * produce more content. - *

- * - * @see HttpInput#setInterceptor(Interceptor) - * @see HttpInput#addInterceptor(Interceptor) - */ - public interface Interceptor - { - /** - * @param content The content to be intercepted (may be empty or a {@link SentinelContent}. - * The content will be modified with any data the interceptor consumes, but there is no requirement - * that all the data is consumed by the interceptor. - * @return The intercepted content or null if interception is completed for that content. - */ - Content readFrom(Content content); - } - /** - * An {@link Interceptor} that chains two other {@link Interceptor}s together. - * The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s - * {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned - * to the next {@link Interceptor}. - */ - public static class ChainedInterceptor implements Interceptor, Destroyable - { - private final Interceptor _prev; - private final Interceptor _next; - - public ChainedInterceptor(Interceptor prev, Interceptor next) - { - _prev = prev; - _next = next; - } - - public Interceptor getPrev() - { - return _prev; - } - - public Interceptor getNext() - { - return _next; - } - - @Override - public Content readFrom(Content content) - { - return getNext().readFrom(getPrev().readFrom(content)); - } - - @Override - public void destroy() - { - if (_prev instanceof Destroyable) - ((Destroyable)_prev).destroy(); - if (_next instanceof Destroyable) - ((Destroyable)_next).destroy(); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(HttpInput.class); - static final Content EOF_CONTENT = new EofContent("EOF"); - static final Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF"); - - private final AutoLock.WithCondition _lock = new AutoLock.WithCondition(); - private final byte[] _oneByteBuffer = new byte[1]; - private Content _content; - private Content _intercepted; - private final Deque _inputQ = new ArrayDeque<>(); - private final HttpChannelState _channelState; - private ReadListener _listener; - private State _state = STREAM; - private long _firstByteTimeStamp = -1; - private long _contentArrived; - private long _contentConsumed; - private long _blockUntil; - private boolean _waitingForContent; - private Interceptor _interceptor; - - public HttpInput(HttpChannelState state) - { - _channelState = state; - } - - protected HttpChannelState getHttpChannelState() - { - return _channelState; - } - - public void recycle() - { - try (AutoLock l = _lock.lock()) - { - if (_content != null) - _content.failed(null); - _content = null; - Content item = _inputQ.poll(); - while (item != null) - { - item.failed(null); - item = _inputQ.poll(); - } - _listener = null; - _state = STREAM; - _contentArrived = 0; - _contentConsumed = 0; - _firstByteTimeStamp = -1; - _blockUntil = 0; - _waitingForContent = false; - if (_interceptor instanceof Destroyable) - ((Destroyable)_interceptor).destroy(); - _interceptor = null; - } - } + public abstract void recycle(); /** * @return The current Interceptor, or null if none set */ - public Interceptor getInterceptor() - { - return _interceptor; - } + public abstract Interceptor getInterceptor(); /** * Set the interceptor. * * @param interceptor The interceptor to use. */ - public void setInterceptor(Interceptor interceptor) - { - _interceptor = interceptor; - } + public abstract void setInterceptor(Interceptor interceptor); /** - * Set the {@link Interceptor}, using a {@link ChainedInterceptor} if - * an {@link Interceptor} is already set. + * Set the {@link org.eclipse.jetty.server.HttpInput.Interceptor}, chaining it to the existing one if + * an {@link org.eclipse.jetty.server.HttpInput.Interceptor} is already set. * - * @param interceptor the next {@link Interceptor} in a chain + * @param interceptor the next {@link org.eclipse.jetty.server.HttpInput.Interceptor} in a chain */ - public void addInterceptor(Interceptor interceptor) - { - if (_interceptor == null) - _interceptor = interceptor; - else - _interceptor = new ChainedInterceptor(_interceptor, interceptor); - } - - @Override - public int available() - { - int available = 0; - boolean woken = false; - try (AutoLock l = _lock.lock()) - { - if (_content == null) - _content = _inputQ.poll(); - if (_content == null) - { - try - { - produceContent(); - } - catch (IOException e) - { - woken = failed(e); - } - if (_content == null) - _content = _inputQ.poll(); - } - - if (_content != null) - available = _content.remaining(); - } - - if (woken) - wake(); - return available; - } - - protected void wake() - { - HttpChannel channel = _channelState.getHttpChannel(); - Executor executor = channel.getConnector().getServer().getThreadPool(); - executor.execute(channel); - } - - @Override - public int read() throws IOException - { - int read = read(_oneByteBuffer, 0, 1); - if (read == 0) - throw new IllegalStateException("unready read=0"); - return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException - { - boolean wake = false; - int read; - try (AutoLock l = _lock.lock()) - { - // Calculate minimum request rate for DOS protection - long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); - if (minRequestDataRate > 0 && _firstByteTimeStamp != -1) - { - long period = System.nanoTime() - _firstByteTimeStamp; - if (period > 0) - { - long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); - if (_contentArrived < minimumData) - { - BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, - String.format("Request content data rate < %d B/s", minRequestDataRate)); - if (_channelState.isResponseCommitted()) - _channelState.getHttpChannel().abort(bad); - throw bad; - } - } - } - - // Consume content looking for bytes to read - while (true) - { - Content item = nextContent(); - if (item != null) - { - read = get(item, b, off, len); - if (LOG.isDebugEnabled()) - LOG.debug("{} read {} from {}", this, read, item); - - // Consume any following poison pills - if (item.isEmpty()) - nextInterceptedContent(); - break; - } - - // No content, so should we block? - if (!_state.blockForContent(this)) - { - // Not blocking, so what should we return? - read = _state.noContent(); - - if (read < 0) - // If EOF do we need to wake for allDataRead callback? - wake = _channelState.onReadEof(); - break; - } - } - } - - if (wake) - wake(); - return read; - } - - /** - * Called when derived implementations should attempt to produce more Content and add it via {@link #addContent(Content)}. For protocols that are constantly - * producing (eg HTTP2) this can be left as a noop; - * - * @throws IOException if unable to produce content - */ - protected void produceContent() throws IOException - { - } + public abstract void addInterceptor(Interceptor interceptor); /** * Called by channel when asynchronous IO needs to produce more content * * @throws IOException if unable to produce content */ - public void asyncReadProduce() throws IOException - { - try (AutoLock l = _lock.lock()) - { - produceContent(); - } - } - - /** - * Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed. - * - * @return the content or null if none available. - * @throws IOException if retrieving the content fails - */ - protected Content nextContent() throws IOException - { - Content content = nextNonSentinelContent(); - if (content == null && !isFinished()) - { - produceContent(); - content = nextNonSentinelContent(); - } - return content; - } - - /** - * Poll the inputQ for Content. Consumed buffers and {@link SentinelContent}s are removed and EOF state updated if need be. - * - * @return Content or null - */ - protected Content nextNonSentinelContent() - { - while (true) - { - // Get the next content (or EOF) - Content content = nextInterceptedContent(); - - // If it is EOF, consume it here - if (content instanceof SentinelContent) - { - // Consume the EOF content, either if it was original content - // or if it was produced by interception - consume(content); - continue; - } - - return content; - } - } - - /** - * Get the next readable from the inputQ, calling {@link #produceContent()} if need be. EOF is NOT processed and state is not changed. - * - * @return the content or EOF or null if none available. - * @throws IOException if retrieving the content fails - */ - protected Content produceNextContext() throws IOException - { - Content content = nextInterceptedContent(); - if (content == null && !isFinished()) - { - produceContent(); - content = nextInterceptedContent(); - } - return content; - } - - /** - * Poll the inputQ for Content or EOF. Consumed buffers and non EOF {@link SentinelContent}s are removed. EOF state is not updated. - * Interception is done within this method. - * - * @return Content with remaining, a {@link SentinelContent}, or null - */ - protected Content nextInterceptedContent() - { - // If we have a chunk produced by interception - if (_intercepted != null) - { - // Use it if it has any remaining content - if (_intercepted.hasContent()) - return _intercepted; - - // succeed the chunk - _intercepted.succeeded(); - _intercepted = null; - } - - // If we don't have a Content under consideration, get - // the next one off the input Q. - if (_content == null) - _content = _inputQ.poll(); - - // While we have content to consider. - while (_content != null) - { - // Are we intercepting? - if (_interceptor != null) - { - // Intercept the current content (may be called several - // times for the same content - _intercepted = _interceptor.readFrom(_content); - - // If interception produced new content - if (_intercepted != null && _intercepted != _content) - { - // if it is not empty use it - if (_intercepted.hasContent()) - return _intercepted; - _intercepted.succeeded(); - } - - // intercepted content consumed - _intercepted = null; - - // fall through so that the unintercepted _content is - // considered for any remaining content, for EOF and to - // succeed it if it is entirely consumed. - } - - // If the content has content or is an EOF marker, use it - if (_content.hasContent() || _content instanceof SentinelContent) - return _content; - - // The content is consumed, so get the next one. Note that EOF - // content is never consumed here, but in #pollContent - _content.succeeded(); - _content = _inputQ.poll(); - } - - return null; - } - - private void consume(Content content) - { - if (!isError() && content instanceof EofContent) - { - if (content == EARLY_EOF_CONTENT) - _state = EARLY_EOF; - else if (_listener == null) - _state = EOF; - else - _state = AEOF; - } - - // Consume the content, either if it was original content - // or if it was produced by interception - content.succeeded(); - if (_content == content) - _content = null; - else if (_intercepted == content) - _intercepted = null; - } - - /** - * Copies the given content into the given byte buffer. - * - * @param content the content to copy from - * @param buffer the buffer to copy into - * @param offset the buffer offset to start copying from - * @param length the space available in the buffer - * @return the number of bytes actually copied - */ - protected int get(Content content, byte[] buffer, int offset, int length) - { - int l = content.get(buffer, offset, length); - _contentConsumed += l; - return l; - } - - /** - * Consumes the given content. Calls the content succeeded if all content consumed. - * - * @param content the content to consume - * @param length the number of bytes to consume - */ - protected void skip(Content content, int length) - { - int l = content.skip(length); - - _contentConsumed += l; - if (l > 0 && content.isEmpty()) - nextNonSentinelContent(); // hungry succeed - } - - /** - * Blocks until some content or some end-of-file event arrives. - * - * @throws IOException if the wait is interrupted - */ - protected void blockForContent() throws IOException - { - assert _lock.isHeldByCurrentThread(); - try - { - _waitingForContent = true; - _channelState.getHttpChannel().onBlockWaitForContent(); - - boolean loop = false; - long timeout = 0; - while (true) - { - // This method is called from a loop, so we just - // need to check the timeout before and after waiting. - if (loop) - break; - - if (LOG.isDebugEnabled()) - LOG.debug("{} blocking for content timeout={}", this, timeout); - if (timeout > 0) - _lock.await(timeout, TimeUnit.MILLISECONDS); - else - _lock.await(); - - loop = true; - } - } - catch (Throwable x) - { - _channelState.getHttpChannel().onBlockWaitForContentFailure(x); - } - } + public abstract void asyncReadProduce() throws IOException; /** * Adds some content to this input stream. @@ -561,70 +69,17 @@ public class HttpInput extends ServletInputStream implements Runnable * @param content the content to add * @return true if content channel woken for read */ - public boolean addContent(Content content) - { - try (AutoLock l = _lock.lock()) - { - _waitingForContent = false; - if (_firstByteTimeStamp == -1) - _firstByteTimeStamp = System.nanoTime(); + public abstract boolean addContent(Content content); - if (isFinished()) - { - Throwable failure = isError() ? _state.getError() : new EOFException("Content after EOF"); - content.failed(failure); - return false; - } - else - { - _contentArrived += content.remaining(); + public abstract boolean hasContent(); - if (_content == null && _inputQ.isEmpty()) - _content = content; - else - _inputQ.offer(content); + public abstract void unblock(); - if (LOG.isDebugEnabled()) - LOG.debug("{} addContent {}", this, content); - - if (nextInterceptedContent() != null) - return wakeup(); - else - return false; - } - } - } - - public boolean hasContent() - { - try (AutoLock l = _lock.lock()) - { - return _content != null || _inputQ.size() > 0; - } - } - - public void unblock() - { - try (AutoLock.WithCondition l = _lock.lock()) - { - l.signal(); - } - } - - public long getContentConsumed() - { - try (AutoLock l = _lock.lock()) - { - return _contentConsumed; - } - } + public abstract long getContentLength(); public long getContentReceived() { - synchronized (_inputQ) - { - return _contentArrived; - } + return getContentLength(); } /** @@ -634,350 +89,40 @@ public class HttpInput extends ServletInputStream implements Runnable * * @return true if content channel woken for read */ - public boolean earlyEOF() - { - return addContent(EARLY_EOF_CONTENT); - } + public abstract boolean earlyEOF(); /** * This method should be called to signal that all the expected content arrived. * * @return true if content channel woken for read */ - public boolean eof() - { - return addContent(EOF_CONTENT); - } + public abstract boolean eof(); - public boolean consumeAll() - { - try (AutoLock l = _lock.lock()) - { - try - { - while (true) - { - Content item = nextContent(); - if (item == null) - break; // Let's not bother blocking + public abstract boolean consumeAll(); - skip(item, item.remaining()); - } - if (isFinished()) - return !isError(); + public abstract boolean isError(); - _state = EARLY_EOF; - return false; - } - catch (Throwable e) - { - if (LOG.isDebugEnabled()) - LOG.debug("Unable to consume all input", e); - _state = new ErrorState(e); - return false; - } - } - } + public abstract boolean isAsync(); - public boolean isError() - { - try (AutoLock l = _lock.lock()) - { - return _state instanceof ErrorState; - } - } + public abstract boolean onIdleTimeout(Throwable x); - public boolean isAsync() - { - try (AutoLock l = _lock.lock()) - { - return _state == ASYNC; - } - } + public abstract boolean failed(Throwable x); @Override - public boolean isFinished() - { - try (AutoLock l = _lock.lock()) - { - return _state instanceof EOFState; - } - } + public abstract int read(byte[] b, int off, int len) throws IOException; @Override - public boolean isReady() + public abstract int available() throws IOException; + + public interface Interceptor { - try - { - try (AutoLock l = _lock.lock()) - { - if (_listener == null) - return true; - if (_state instanceof EOFState) - return true; - if (_waitingForContent) - return false; - if (produceNextContext() != null) - return true; - _channelState.onReadUnready(); - _waitingForContent = true; - } - return false; - } - catch (IOException e) - { - LOG.trace("IGNORED", e); - return true; - } - } - - @Override - public void setReadListener(ReadListener readListener) - { - boolean woken = false; - try - { - try (AutoLock l = _lock.lock()) - { - if (_listener != null) - throw new IllegalStateException("ReadListener already set"); - - _listener = Objects.requireNonNull(readListener); - - //illegal if async not started - if (!_channelState.isAsyncStarted()) - throw new IllegalStateException("Async not started"); - - if (isError()) - { - woken = _channelState.onReadReady(); - } - else - { - Content content = produceNextContext(); - if (content != null) - { - _state = ASYNC; - woken = _channelState.onReadReady(); - } - else if (_state == EOF) - { - _state = AEOF; - woken = _channelState.onReadEof(); - } - else - { - _state = ASYNC; - _channelState.onReadUnready(); - _waitingForContent = true; - } - } - } - } - catch (IOException e) - { - throw new RuntimeIOException(e); - } - - if (woken) - wake(); - } - - public boolean onIdleTimeout(Throwable x) - { - try (AutoLock l = _lock.lock()) - { - boolean neverDispatched = getHttpChannelState().isIdle(); - if ((_waitingForContent || neverDispatched) && !isError()) - { - x.addSuppressed(new Throwable("HttpInput idle timeout")); - _state = new ErrorState(x); - return wakeup(); - } - return false; - } - } - - public boolean failed(Throwable x) - { - try (AutoLock l = _lock.lock()) - { - // Errors may be reported multiple times, for example - // a local idle timeout and a remote I/O failure. - if (isError()) - { - if (LOG.isDebugEnabled()) - { - // Log both the original and current failure - // without modifying the original failure. - Throwable failure = new Throwable(_state.getError()); - failure.addSuppressed(x); - LOG.debug("HttpInput failure", failure); - } - } - else - { - // Add a suppressed throwable to capture this stack - // trace without wrapping/hiding the original failure. - x.addSuppressed(new Throwable("HttpInput failure")); - _state = new ErrorState(x); - } - return wakeup(); - } - } - - private boolean wakeup() - { - assert _lock.isHeldByCurrentThread(); - if (_listener != null) - return _channelState.onContentAdded(); - _lock.signal(); - return false; - } - - /* - *

While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link - * ContextHandler#handle(Runnable)} to setup classloaders etc.

- */ - @Override - public void run() - { - final ReadListener listener; - Throwable error; - boolean aeof = false; - - try (AutoLock l = _lock.lock()) - { - listener = _listener; - - if (_state == EOF) - return; - - if (_state == AEOF) - { - _state = EOF; - aeof = true; - } - - error = _state.getError(); - - if (!aeof && error == null) - { - Content content = nextInterceptedContent(); - if (content == null) - return; - - // Consume a directly received EOF without first calling onDataAvailable - // So -1 will never be read and only onAddDataRread or onError will be called - if (content instanceof EofContent) - { - consume(content); - if (_state == EARLY_EOF) - error = _state.getError(); - else if (_state == AEOF) - { - aeof = true; - _state = EOF; - } - } - } - } - - try - { - if (error != null) - { - // TODO is this necessary to add here? - _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); - listener.onError(error); - } - else if (aeof) - { - listener.onAllDataRead(); - } - else - { - listener.onDataAvailable(); - // If -1 was read, then HttpChannelState#onEOF will have been called and a subsequent - // unhandle will call run again so onAllDataRead() can be called. - } - } - catch (Throwable e) - { - if (LOG.isDebugEnabled()) - LOG.warn("Unable to notify listener", e); - else - LOG.warn("Unable to notify listener: {}", e.toString()); - try - { - if (aeof || error == null) - { - _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); - listener.onError(e); - } - } - catch (Throwable e2) - { - String msg = "Unable to notify error to listener"; - if (LOG.isDebugEnabled()) - LOG.warn(msg, e2); - else - LOG.warn("{}: {}", msg, e2.toString()); - throw new RuntimeIOException(msg, e2); - } - } - } - - @Override - public String toString() - { - State state; - long consumed; - int q; - Content content; - try (AutoLock l = _lock.lock()) - { - state = _state; - consumed = _contentConsumed; - q = _inputQ.size(); - content = _inputQ.peekFirst(); - } - return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]", - getClass().getSimpleName(), - hashCode(), - consumed, - q, - content, - state); - } - - /** - * A Sentinel Content, which has zero length content but - * indicates some other event in the input stream (eg EOF) - */ - public static class SentinelContent extends Content - { - private final String _name; - - public SentinelContent(String name) - { - super(BufferUtil.EMPTY_BUFFER); - _name = name; - } - - @Override - public String toString() - { - return _name; - } - } - - public static class EofContent extends SentinelContent - { - EofContent(String name) - { - super(name); - } + /** + * @param content The content to be intercepted. + * The content will be modified with any data the interceptor consumes, but there is no requirement + * that all the data is consumed by the interceptor. + * @return The intercepted content or null if interception is completed for that content. + */ + Content readFrom(Content content); } public static class Content implements Callback @@ -1036,125 +181,4 @@ public class HttpInput extends ServletInputStream implements Runnable } } - protected abstract static class State - { - public boolean blockForContent(HttpInput in) throws IOException - { - return false; - } - - public int noContent() throws IOException - { - return -1; - } - - public Throwable getError() - { - return null; - } - } - - protected static class EOFState extends State - { - } - - protected class ErrorState extends EOFState - { - final Throwable _error; - - ErrorState(Throwable error) - { - _error = error; - } - - @Override - public Throwable getError() - { - return _error; - } - - @Override - public int noContent() throws IOException - { - if (_error instanceof IOException) - throw (IOException)_error; - throw new IOException(_error); - } - - @Override - public String toString() - { - return "ERROR:" + _error; - } - } - - protected static final State STREAM = new State() - { - @Override - public boolean blockForContent(HttpInput input) throws IOException - { - input.blockForContent(); - return true; - } - - @Override - public String toString() - { - return "STREAM"; - } - }; - - protected static final State ASYNC = new State() - { - @Override - public int noContent() throws IOException - { - return 0; - } - - @Override - public String toString() - { - return "ASYNC"; - } - }; - - protected static final State EARLY_EOF = new EOFState() - { - @Override - public int noContent() throws IOException - { - throw getError(); - } - - @Override - public String toString() - { - return "EARLY_EOF"; - } - - @Override - public IOException getError() - { - return new EofException("Early EOF"); - } - }; - - protected static final State EOF = new EOFState() - { - @Override - public String toString() - { - return "EOF"; - } - }; - - protected static final State AEOF = new EOFState() - { - @Override - public String toString() - { - return "AEOF"; - } - }; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverFCGI.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverFCGI.java new file mode 100644 index 00000000000..03da948c2f1 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverFCGI.java @@ -0,0 +1,699 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.Condition; +import javax.servlet.ReadListener; + +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.util.component.Destroyable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.AutoLock; + +import static org.eclipse.jetty.server.HttpInputOverHTTP.Eof; + +public class HttpInputOverFCGI extends HttpInput +{ + private static final Logger LOG = Log.getLogger(HttpInputOverFCGI.class); + + private final byte[] _oneByteBuffer = new byte[1]; + private final HttpChannelState _channelState; + + private final AutoLock _contentLock = new AutoLock(); + private final Condition _contentLockCondition = _contentLock.newCondition(); + + private final ContentProducer _contentProducer; + private Eof _eof = Eof.NOT_YET; + private Throwable _error; + private ReadListener _readListener; + + public HttpInputOverFCGI(HttpChannelState state) + { + _channelState = state; + _contentProducer = new ContentProducer(); + } + + /* HttpInput */ + + @Override + public void recycle() + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("recycle"); + _contentProducer.recycle(); + _eof = Eof.NOT_YET; + _error = null; + _readListener = null; + } + } + + @Override + public Interceptor getInterceptor() + { + try (AutoLock lock = _contentLock.lock()) + { + return _contentProducer.getInterceptor(); + } + } + + @Override + public void setInterceptor(Interceptor interceptor) + { + try (AutoLock lock = _contentLock.lock()) + { + _contentProducer.setInterceptor(interceptor); + } + } + + @Override + public void addInterceptor(Interceptor interceptor) + { + try (AutoLock lock = _contentLock.lock()) + { + _contentProducer.addInterceptor(interceptor); + } + } + + @Override + public void asyncReadProduce() + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("asyncReadProduce {}", _contentProducer); + _contentProducer.produceRawContent(); + } + } + + @Override + public boolean addContent(Content content) + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("addContent {} {}", content, _contentProducer); + long contentArrived = _contentProducer.addContent(content); + long requestContentLength = _channelState.getHttpChannel().getRequest().getContentLengthLong(); + // return false to make the parser go on, true to make it stop + // -> tell the parser to stop adding content, unless we have read everything + boolean stopParsing = requestContentLength == -1 || contentArrived < requestContentLength; + if (isAsync()) + _channelState.onContentAdded(); + return stopParsing; + } + } + + @Override + public boolean hasContent() + { + try (AutoLock lock = _contentLock.lock()) + { + return _contentProducer.hasRawContent(); + } + } + + @Override + public void unblock() + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("signalling blocked thread to wake up"); + _contentLockCondition.signal(); + } + } + + @Override + public long getContentLength() + { + try (AutoLock lock = _contentLock.lock()) + { + return _contentProducer.getRawContentArrived(); + } + } + + @Override + public boolean earlyEOF() + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("received early EOF"); + _eof = Eof.EARLY_EOF; + if (isAsync()) + return _channelState.onContentAdded(); + unblock(); + return false; + } + } + + @Override + public boolean eof() + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("received EOF"); + _eof = Eof.EOF; + if (isAsync()) + return _channelState.onContentAdded(); + unblock(); + return false; + } + } + + @Override + public boolean consumeAll() + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("consume all"); + _contentProducer.consumeAll(); + if (_eof.isEof()) + _eof = Eof.CONSUMED_EOF; + + if (isFinished()) + return !isError(); + + _eof = Eof.EARLY_EOF; + return false; + } + } + + @Override + public boolean isError() + { + try (AutoLock lock = _contentLock.lock()) + { + return _error != null; + } + } + + @Override + public boolean isAsync() + { + try (AutoLock lock = _contentLock.lock()) + { + return _readListener != null; + } + } + + @Override + public boolean onIdleTimeout(Throwable x) + { + //TODO implement me! + return false; + } + + @Override + public boolean failed(Throwable x) + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("failed " + x); + if (_error != null) + _error.addSuppressed(x); + else + _error = x; + + if (isAsync()) + return _channelState.onContentAdded(); + unblock(); + return false; + } + } + + /* ServletInputStream */ + + @Override + public boolean isFinished() + { + try (AutoLock lock = _contentLock.lock()) + { + boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed(); + if (LOG.isDebugEnabled()) + LOG.debug("isFinished? {}", finished); + return finished; + } + } + + @Override + public boolean isReady() + { + try (AutoLock lock = _contentLock.lock()) + { + // calling _contentProducer.hasTransformedContent() might change the _eof state, so the following test order matters + if (_contentProducer.hasTransformedContent() || _eof.isEof()) + { + if (LOG.isDebugEnabled()) + LOG.debug("isReady? true"); + return true; + } + if (LOG.isDebugEnabled()) + LOG.debug("isReady? false"); + _channelState.onReadUnready(); + return false; + } + } + + @Override + public void setReadListener(ReadListener readListener) + { + try (AutoLock lock = _contentLock.lock()) + { + if (_readListener != null) + throw new IllegalStateException("ReadListener already set"); + _readListener = Objects.requireNonNull(readListener); + + if (LOG.isDebugEnabled()) + LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer); + boolean woken; + if (isError()) + { + woken = _channelState.onReadReady(); + } + else + { + if (_contentProducer.hasTransformedContent()) + { + woken = _channelState.onReadReady(); + } + else if (_eof.isEof()) + { + woken = _channelState.onReadEof(); + } + else + { + _channelState.onReadUnready(); + woken = false; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("setReadListener woken=" + woken); + if (woken) + scheduleReadListenerNotification(); + } + } + + private void scheduleReadListenerNotification() + { + HttpChannel channel = _channelState.getHttpChannel(); + channel.execute(channel); + } + + @Override + public int read() throws IOException + { + int read = read(_oneByteBuffer, 0, 1); + if (read == 0) + throw new IOException("unready read=0"); + return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + try (AutoLock lock = _contentLock.lock()) + { + while (true) + { + int read = _contentProducer.read(b, off, len); + if (LOG.isDebugEnabled()) + LOG.debug("read produced {} byte(s)", read); + if (read > 0) + return read; + + if (LOG.isDebugEnabled()) + LOG.debug("read error = " + _error); + if (_error != null) + throw new IOException(_error); + + if (LOG.isDebugEnabled()) + LOG.debug("read EOF = {}", _eof); + if (_eof.isEarly()) + throw new EofException("Early EOF"); + + if (LOG.isDebugEnabled()) + LOG.debug("read async = {}", isAsync()); + if (!isAsync()) + { + if (_eof.isEof()) + { + _eof = Eof.CONSUMED_EOF; + if (LOG.isDebugEnabled()) + LOG.debug("read on EOF, switching to CONSUMED_EOF and returning"); + return -1; + } + if (LOG.isDebugEnabled()) + LOG.debug("read blocked"); + blockForContent(); + if (LOG.isDebugEnabled()) + LOG.debug("read unblocked"); + } + else + { + if (_eof.isEof()) + { + boolean wasInAsyncWait = _channelState.onReadEof(); + if (wasInAsyncWait) + scheduleReadListenerNotification(); + if (LOG.isDebugEnabled()) + LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait); + _eof = Eof.CONSUMED_EOF; + return -1; + } + else + { + //TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead? + _channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested + return 0; + } + } + } + } + } + + @Override + public int available() + { + try (AutoLock lock = _contentLock.lock()) + { + int available = _contentProducer.available(); + if (LOG.isDebugEnabled()) + LOG.debug("available = {}", available); + return available; + } + } + + private void blockForContent() + { + try + { + _channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested + if (LOG.isDebugEnabled()) + LOG.debug("waiting for signal to wake up"); + _contentLockCondition.await(); + if (LOG.isDebugEnabled()) + LOG.debug("signalled to wake up"); + } + catch (Throwable x) + { + _channelState.getHttpChannel().onBlockWaitForContentFailure(x); + } + } + + /* Runnable */ + + /* + *

While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link + * ContextHandler#handle(Runnable)} to setup classloaders etc.

+ */ + @Override + public void run() + { + try (AutoLock lock = _contentLock.lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("running"); + if (!_contentProducer.hasRawContent()) + { + if (_error != null || _eof.isEarly()) + { + // TODO is this necessary to add here? + _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); + if (_error != null) + _readListener.onError(_error); + else + _readListener.onError(new EofException("Early EOF")); + } + else if (_eof.isEof()) + { + try + { + _readListener.onAllDataRead(); + } + catch (Throwable x) + { + _readListener.onError(x); + } + } + // else: !hasContent() && !error && !EOF -> no-op + } + else + { + try + { + _readListener.onDataAvailable(); + } + catch (Throwable x) + { + _readListener.onError(x); + } + } + } + } + + private static class ContentProducer + { + // Note: _rawContent can never be null for as long as _transformedContent is not null. + private final Queue _rawContentQueue = new LinkedBlockingQueue<>(); + private HttpInput.Content _currentRawContent; + private HttpInput.Content _transformedContent; + private long _rawContentArrived; + private HttpInput.Interceptor _interceptor; + + void recycle() + { + if (LOG.isDebugEnabled()) + LOG.debug("recycle {}", this); + if (_transformedContent == _currentRawContent) + _transformedContent = null; + if (_transformedContent != null && !_transformedContent.isEmpty()) + _transformedContent.failed(null); + _transformedContent = null; + if (_currentRawContent != null && !_currentRawContent.isEmpty()) + _currentRawContent.failed(null); + _currentRawContent = null; + _rawContentQueue.clear(); + _rawContentArrived = 0L; + if (_interceptor instanceof Destroyable) + ((Destroyable)_interceptor).destroy(); + _interceptor = null; + } + + int available() + { + if (_currentRawContent == null) + produceRawContent(); + return _currentRawContent == null ? 0 : _currentRawContent.remaining(); + } + + long getRawContentArrived() + { + return _rawContentArrived; + } + + boolean hasRawContent() + { + return _currentRawContent != null; + } + + boolean hasTransformedContent() + { + if (_transformedContent != null) + return true; + if (_currentRawContent == null) + produceRawContent(); + produceTransformedContent(); + return _transformedContent != null; + } + + HttpInput.Interceptor getInterceptor() + { + return _interceptor; + } + + void setInterceptor(HttpInput.Interceptor interceptor) + { + this._interceptor = interceptor; + } + + void addInterceptor(HttpInput.Interceptor interceptor) + { + if (_interceptor == null) + _interceptor = interceptor; + else + _interceptor = new HttpInputOverHTTP.ChainedInterceptor(_interceptor, interceptor); + } + + long addContent(HttpInput.Content content) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} addContent {}", this, content); + if (content == null) + throw new AssertionError("Cannot add null content"); + + _rawContentQueue.offer(content); + _rawContentArrived += content.remaining(); + + return _rawContentArrived; + } + + void consumeAll() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} consumeAll", this); + // start by depleting the current _transformedContent + if (_transformedContent != null) + { + _transformedContent.skip(_transformedContent.remaining()); + if (_transformedContent != _currentRawContent) + _transformedContent.succeeded(); + _transformedContent = null; + } + + // don't bother transforming content, directly deplete the raw one + while (true) + { + if (_currentRawContent == null) + produceRawContent(); + if (_currentRawContent == null) + break; + + _currentRawContent.skip(_currentRawContent.remaining()); + _currentRawContent.succeeded(); + _currentRawContent = null; + } + } + + int read(byte[] b, int off, int len) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} read", this); + while (_transformedContent == null) + { + if (_currentRawContent == null) + { + produceRawContent(); + if (_currentRawContent == null) + return 0; + } + produceTransformedContent(); + } + + int read = _transformedContent.get(b, off, len); + if (_transformedContent.isEmpty()) + produceTransformedContent(); + + return read; + } + + /** + * Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer + * if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)} + * is called. + */ + void produceRawContent() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} produceRawContent", this); + _currentRawContent = _rawContentQueue.poll(); + } + + /** + * Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent}, + * or store null in {@code _transformedContent} if there is no content to work with. + * Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}. + */ + private void produceTransformedContent() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} produceTransformedContent", this); + if (_interceptor == null) + { + // no interceptor set + if (_currentRawContent != null && _currentRawContent.isEmpty()) + { + _currentRawContent.succeeded(); + _currentRawContent = null; + _transformedContent = null; + } + else + { + _transformedContent = _currentRawContent; + } + } + else + { + // interceptor set + transformContent(); + if (_transformedContent == null) + { + if (_currentRawContent != null && _currentRawContent.isEmpty()) + { + _currentRawContent.succeeded(); + _currentRawContent = null; + } + else + { + _transformedContent = _currentRawContent; + } + } + } + } + + /** + * Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor. + * The produced content is guaranteed to either be null or not empty. + */ + private void transformContent() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} transformContent", this); + if (_currentRawContent == null) + return; + + _transformedContent = _interceptor.readFrom(_currentRawContent); + + if (_transformedContent != null && _transformedContent.isEmpty()) + { + if (_transformedContent != _currentRawContent) + _transformedContent.succeeded(); + _transformedContent = null; + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived + + ",r=" + _currentRawContent + ",t=" + _transformedContent + "]"; + } + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index 6f646c4ce61..a7f1ee9f22e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -19,17 +19,719 @@ package org.eclipse.jetty.server; import java.io.IOException; +import java.util.Objects; +import javax.servlet.ReadListener; +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.util.component.Destroyable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +// tests used: RequestTest, PartialRFC2616Test, AsyncRequestReadTest, AsyncIOServletTest, GzipHandlerTest public class HttpInputOverHTTP extends HttpInput { + private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class); + + private final byte[] _oneByteBuffer = new byte[1]; + private final HttpChannelState _channelState; + + private final NotifyingSemaphore _semaphore = new NotifyingSemaphore(); + + // TODO: think about thread visibility of the below variables + private final ContentProducer _contentProducer; + private Eof _eof = Eof.NOT_YET; + private Throwable _error; + private ReadListener _readListener; + public HttpInputOverHTTP(HttpChannelState state) { - super(state); + _channelState = state; + _contentProducer = new ContentProducer(() -> ((HttpConnection)state.getHttpChannel().getEndPoint().getConnection()).parseAndFillForContent()); + } + + /* HttpInput */ + + @Override + public void recycle() + { + if (LOG.isDebugEnabled()) + LOG.debug("recycle"); + _contentProducer.recycle(); + _eof = Eof.NOT_YET; + _error = null; + _readListener = null; } @Override - protected void produceContent() throws IOException + public Interceptor getInterceptor() { - ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent(); + return _contentProducer.getInterceptor(); + } + + @Override + public void setInterceptor(Interceptor interceptor) + { + _contentProducer.setInterceptor(interceptor); + } + + @Override + public void addInterceptor(Interceptor interceptor) + { + _contentProducer.addInterceptor(interceptor); + } + + @Override + public void asyncReadProduce() + { + if (LOG.isDebugEnabled()) + LOG.debug("asyncReadProduce {}", _contentProducer); + _contentProducer.produceRawContent(); + } + + @Override + public boolean addContent(Content content) + { + if (LOG.isDebugEnabled()) + LOG.debug("addContent {} {}", content, _contentProducer); + long contentArrived = _contentProducer.addContent(content); + long requestContentLength = _channelState.getHttpChannel().getRequest().getContentLengthLong(); + // return false to make the parser go on, true to make it stop + // -> tell the parser to stop adding content, unless we have read everything + boolean stopParsing = requestContentLength == -1 || contentArrived < requestContentLength; + if (isAsync()) + _channelState.onContentAdded(); + return stopParsing; + } + + @Override + public boolean hasContent() + { + return _contentProducer.hasRawContent(); + } + + @Override + public void unblock() + { + if (LOG.isDebugEnabled()) + LOG.debug("signalling blocked thread to wake up"); + _semaphore.release(); + } + + @Override + public long getContentLength() + { + return _contentProducer.getRawContentArrived(); + } + + @Override + public boolean earlyEOF() + { + if (LOG.isDebugEnabled()) + LOG.debug("received early EOF"); + _eof = Eof.EARLY_EOF; + if (isAsync()) + return _channelState.onContentAdded(); + unblock(); + return false; + } + + @Override + public boolean eof() + { + if (LOG.isDebugEnabled()) + LOG.debug("received EOF"); + _eof = Eof.EOF; + if (isAsync()) + return _channelState.onContentAdded(); + unblock(); + return false; + } + + @Override + public boolean consumeAll() + { + if (LOG.isDebugEnabled()) + LOG.debug("consume all"); + _contentProducer.consumeAll(); + if (_eof.isEof()) + _eof = Eof.CONSUMED_EOF; + + if (isFinished()) + return !isError(); + + _eof = Eof.EARLY_EOF; + return false; + } + + @Override + public boolean isError() + { + return _error != null; + } + + @Override + public boolean isAsync() + { + return _readListener != null; + } + + @Override + public boolean onIdleTimeout(Throwable x) + { + //TODO implement me! + return false; + } + + @Override + public boolean failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("failed " + x); + if (_error != null) + _error.addSuppressed(x); + else + _error = x; + + if (isAsync()) + return _channelState.onContentAdded(); + unblock(); + return false; + } + + /* ServletInputStream */ + + @Override + public boolean isFinished() + { + boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed(); + if (LOG.isDebugEnabled()) + LOG.debug("isFinished? {}", finished); + return finished; + } + + @Override + public boolean isReady() + { + // calling _contentProducer.hasTransformedContent() might change the _eof state, so the following test order matters + if (_contentProducer.hasTransformedContent() || _eof.isEof()) + { + if (LOG.isDebugEnabled()) + LOG.debug("isReady? true"); + return true; + } + if (LOG.isDebugEnabled()) + LOG.debug("isReady? false"); + _channelState.onReadUnready(); + return false; + } + + @Override + public void setReadListener(ReadListener readListener) + { + if (_readListener != null) + throw new IllegalStateException("ReadListener already set"); + _readListener = Objects.requireNonNull(readListener); + + if (LOG.isDebugEnabled()) + LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer); + boolean woken; + if (isError()) + { + woken = _channelState.onReadReady(); + } + else + { + if (_contentProducer.hasTransformedContent()) + { + woken = _channelState.onReadReady(); + } + else if (_eof.isEof()) + { + woken = _channelState.onReadEof(); + } + else + { + _channelState.onReadUnready(); + woken = false; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("setReadListener woken=" + woken); + if (woken) + scheduleReadListenerNotification(); + } + + private void scheduleReadListenerNotification() + { + HttpChannel channel = _channelState.getHttpChannel(); + channel.execute(channel); + } + + @Override + public int read() throws IOException + { + int read = read(_oneByteBuffer, 0, 1); + if (read == 0) + throw new IOException("unready read=0"); + return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + while (true) + { + int read = _contentProducer.read(b, off, len); + if (LOG.isDebugEnabled()) + LOG.debug("read produced {} byte(s)", read); + if (read > 0) + return read; + + if (LOG.isDebugEnabled()) + LOG.debug("read error = " + _error); + if (_error != null) + throw new IOException(_error); + + if (LOG.isDebugEnabled()) + LOG.debug("read EOF = {}", _eof); + if (_eof.isEarly()) + throw new EofException("Early EOF"); + + if (LOG.isDebugEnabled()) + LOG.debug("read async = {}", isAsync()); + if (!isAsync()) + { + if (_eof.isEof()) + { + _eof = Eof.CONSUMED_EOF; + if (LOG.isDebugEnabled()) + LOG.debug("read on EOF, switching to CONSUMED_EOF and returning"); + return -1; + } + if (LOG.isDebugEnabled()) + LOG.debug("read blocked"); + blockForContent(); + if (LOG.isDebugEnabled()) + LOG.debug("read unblocked"); + } + else + { + if (_eof.isEof()) + { + boolean wasInAsyncWait = _channelState.onReadEof(); + if (wasInAsyncWait) + scheduleReadListenerNotification(); + if (LOG.isDebugEnabled()) + LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait); + _eof = Eof.CONSUMED_EOF; + return -1; + } + else + { + //TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead? + _channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested + return 0; + } + } + } + } + + @Override + public int available() + { + int available = _contentProducer.available(); + if (LOG.isDebugEnabled()) + LOG.debug("available = {}", available); + return available; + } + + private void blockForContent() + { + try + { + _semaphore.acquire(() -> + { + if (LOG.isDebugEnabled()) + LOG.debug("waiting for signal to wake up"); + _channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested if it blocks + }); + } + catch (Throwable x) + { + _channelState.getHttpChannel().onBlockWaitForContentFailure(x); + } + } + + /* Runnable */ + + /* + *

While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link + * ContextHandler#handle(Runnable)} to setup classloaders etc.

+ */ + @Override + public void run() + { + if (LOG.isDebugEnabled()) + LOG.debug("running"); + if (!_contentProducer.hasRawContent()) + { + if (_error != null || _eof.isEarly()) + { + // TODO is this necessary to add here? + _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); + if (_error != null) + _readListener.onError(_error); + else + _readListener.onError(new EofException("Early EOF")); + } + else if (_eof.isEof()) + { + try + { + _readListener.onAllDataRead(); + } + catch (Throwable x) + { + _readListener.onError(x); + } + } + // else: !hasContent() && !error && !EOF -> no-op + } + else + { + try + { + _readListener.onDataAvailable(); + } + catch (Throwable x) + { + _readListener.onError(x); + } + } + } + + /** + * An {@link Interceptor} that chains two other {@link Interceptor}s together. + * The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s + * {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned + * to the next {@link Interceptor}. + */ + static class ChainedInterceptor implements Interceptor, Destroyable + { + private final Interceptor _prev; + private final Interceptor _next; + + public ChainedInterceptor(Interceptor prev, Interceptor next) + { + _prev = prev; + _next = next; + } + + public Interceptor getPrev() + { + return _prev; + } + + public Interceptor getNext() + { + return _next; + } + + @Override + public Content readFrom(Content content) + { + return getNext().readFrom(getPrev().readFrom(content)); + } + + @Override + public void destroy() + { + if (_prev instanceof Destroyable) + ((Destroyable)_prev).destroy(); + if (_next instanceof Destroyable) + ((Destroyable)_next).destroy(); + } + } + + enum Eof + { + NOT_YET(false, false, false), + EOF(true, false, false), + CONSUMED_EOF(true, true, false), + EARLY_EOF(true, false, true), + ; + + private final boolean _eof; + private final boolean _consumed; + private final boolean _early; + + Eof(boolean eof, boolean consumed, boolean early) + { + _eof = eof; + _consumed = consumed; + _early = early; + } + + public boolean isEof() + { + return _eof; + } + + public boolean isConsumed() + { + return _consumed; + } + + public boolean isEarly() + { + return _early; + } + } + + private static class ContentProducer + { + private final Runnable _rawContentProducer; + // TODO: think about thread visibility of the below variables + // Note: _rawContent can never be null for as long as _transformedContent is not null. + private HttpInput.Content _rawContent; + private HttpInput.Content _transformedContent; + private long _rawContentArrived; + private HttpInput.Interceptor _interceptor; + + public ContentProducer(Runnable rawContentProducer) + { + _rawContentProducer = rawContentProducer; + } + + void recycle() + { + if (LOG.isDebugEnabled()) + LOG.debug("recycle {}", this); + if (_transformedContent == _rawContent) + _transformedContent = null; + if (_transformedContent != null && !_transformedContent.isEmpty()) + _transformedContent.failed(null); + _transformedContent = null; + if (_rawContent != null && !_rawContent.isEmpty()) + _rawContent.failed(null); + _rawContent = null; + _rawContentArrived = 0L; + if (_interceptor instanceof Destroyable) + ((Destroyable)_interceptor).destroy(); + _interceptor = null; + } + + int available() + { + if (_rawContent == null) + produceRawContent(); + return _rawContent == null ? 0 : _rawContent.remaining(); + } + + long getRawContentArrived() + { + return _rawContentArrived; + } + + boolean hasRawContent() + { + return _rawContent != null; + } + + boolean hasTransformedContent() + { + if (_transformedContent != null) + return true; + if (_rawContent == null) + produceRawContent(); + produceTransformedContent(); + return _transformedContent != null; + } + + HttpInput.Interceptor getInterceptor() + { + return _interceptor; + } + + void setInterceptor(HttpInput.Interceptor interceptor) + { + this._interceptor = interceptor; + } + + void addInterceptor(HttpInput.Interceptor interceptor) + { + if (_interceptor == null) + _interceptor = interceptor; + else + _interceptor = new HttpInputOverHTTP.ChainedInterceptor(_interceptor, interceptor); + } + + long addContent(HttpInput.Content content) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} addContent {}", this, content); + if (content == null) + throw new AssertionError("Cannot add null content"); + if (_rawContent != null) + throw new AssertionError("Cannot add new content while current one hasn't been processed"); + + _rawContent = content; + _rawContentArrived += content.remaining(); + + return _rawContentArrived; + } + + void consumeAll() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} consumeAll", this); + // start by depleting the current _transformedContent + if (_transformedContent != null) + { + _transformedContent.skip(_transformedContent.remaining()); + if (_transformedContent != _rawContent) + _transformedContent.succeeded(); + _transformedContent = null; + } + + // don't bother transforming content, directly deplete the raw one + while (true) + { + if (_rawContent == null) + produceRawContent(); + if (_rawContent == null) + break; + + _rawContent.skip(_rawContent.remaining()); + _rawContent.succeeded(); + _rawContent = null; + } + } + + int read(byte[] b, int off, int len) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} read", this); + while (_transformedContent == null) + { + if (_rawContent == null) + { + produceRawContent(); + if (_rawContent == null) + return 0; + } + produceTransformedContent(); + } + + int read = _transformedContent.get(b, off, len); + if (_transformedContent.isEmpty()) + produceTransformedContent(); + + return read; + } + + /** + * Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer + * if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)} + * is called. + */ + void produceRawContent() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} produceRawContent", this); + _rawContentProducer.run(); + } + + /** + * Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent}, + * or store null in {@code _transformedContent} if there is no content to work with. + * Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}. + */ + private void produceTransformedContent() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} produceTransformedContent", this); + if (_interceptor == null) + { + // no interceptor set + if (_rawContent != null && _rawContent.isEmpty()) + { + _rawContent.succeeded(); + _rawContent = null; + _transformedContent = null; + } + else + { + _transformedContent = _rawContent; + } + } + else + { + // interceptor set + transformContent(); + if (_transformedContent == null) + { + if (_rawContent != null && _rawContent.isEmpty()) + { + _rawContent.succeeded(); + _rawContent = null; + } + else + { + _transformedContent = _rawContent; + } + } + } + } + + /** + * Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor. + * The produced content is guaranteed to either be null or not empty. + */ + private void transformContent() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} transformContent", this); + if (_rawContent == null) + return; + + _transformedContent = _interceptor.readFrom(_rawContent); + + if (_transformedContent != null && _transformedContent.isEmpty()) + { + if (_transformedContent != _rawContent) + _transformedContent.succeeded(); + _transformedContent = null; + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived + + ",r=" + _rawContent + ",t=" + _transformedContent + "]"; + } + } + + private static class NotifyingSemaphore + { + private int permits; + + public synchronized void acquire(Runnable onBlocking) throws InterruptedException + { + if (permits == 0) + onBlocking.run(); + while (permits == 0) + wait(); + permits--; + } + + public synchronized void release() + { + permits++; + if (permits == 1) + notify(); + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP2.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP2.java new file mode 100644 index 00000000000..e97009217c1 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP2.java @@ -0,0 +1,180 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import javax.servlet.ReadListener; + +public class HttpInputOverHTTP2 extends HttpInput +{ + private final byte[] _oneByteBuffer = new byte[1]; + private final HttpChannelState _channelState; + + public HttpInputOverHTTP2(HttpChannelState state) + { + _channelState = state; + } + + /* HttpInput */ + + @Override + public void recycle() + { + + } + + @Override + public Interceptor getInterceptor() + { + return null; + } + + @Override + public void setInterceptor(Interceptor interceptor) + { + + } + + @Override + public void addInterceptor(Interceptor interceptor) + { + + } + + @Override + public void asyncReadProduce() throws IOException + { + + } + + @Override + public boolean addContent(Content content) + { + return false; + } + + @Override + public boolean hasContent() + { + return false; + } + + @Override + public void unblock() + { + + } + + @Override + public long getContentLength() + { + return 0; + } + + @Override + public boolean earlyEOF() + { + return false; + } + + @Override + public boolean eof() + { + return false; + } + + @Override + public boolean consumeAll() + { + return false; + } + + @Override + public boolean isError() + { + return false; + } + + @Override + public boolean isAsync() + { + return false; + } + + @Override + public boolean onIdleTimeout(Throwable x) + { + return false; + } + + @Override + public boolean failed(Throwable x) + { + return false; + } + + /* ServletInputStream */ + + @Override + public boolean isFinished() + { + return false; + } + + @Override + public boolean isReady() + { + return false; + } + + @Override + public void setReadListener(ReadListener readListener) + { + + } + + @Override + public int read() throws IOException + { + int read = read(_oneByteBuffer, 0, 1); + if (read == 0) + throw new IllegalStateException("unready read=0"); + return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + return 0; + } + + @Override + public int available() throws IOException + { + return 0; + } + + /* Runnable */ + + @Override + public void run() + { + + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index a431184250a..5e3908048fb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -749,7 +749,7 @@ public class Request implements HttpServletRequest public long getContentRead() { - return _input.getContentConsumed(); + return _input.getContentLength(); } @Override diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java deleted file mode 100644 index 4401d4daa58..00000000000 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java +++ /dev/null @@ -1,735 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 -// -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.server; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; -import javax.servlet.ReadListener; - -import org.eclipse.jetty.server.HttpChannelState.Action; -import org.eclipse.jetty.server.HttpInput.Content; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.thread.Scheduler; -import org.hamcrest.Matchers; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT; -import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * this tests HttpInput and its interaction with HttpChannelState - */ - -public class HttpInputAsyncStateTest -{ - - private static final Queue __history = new LinkedBlockingQueue<>(); - private ByteBuffer _expected = BufferUtil.allocate(16 * 1024); - private boolean _eof; - private boolean _noReadInDataAvailable; - private boolean _completeInOnDataAvailable; - - private final ReadListener _listener = new ReadListener() - { - @Override - public void onError(Throwable t) - { - __history.add("onError:" + t); - } - - @Override - public void onDataAvailable() throws IOException - { - __history.add("onDataAvailable"); - if (!_noReadInDataAvailable && readAvailable() && _completeInOnDataAvailable) - { - __history.add("complete"); - _state.complete(); - } - } - - @Override - public void onAllDataRead() throws IOException - { - __history.add("onAllDataRead"); - } - }; - private HttpInput _in; - HttpChannelState _state; - - public static class TContent extends HttpInput.Content - { - public TContent(String content) - { - super(BufferUtil.toBuffer(content)); - } - } - - @BeforeEach - public void before() - { - _noReadInDataAvailable = false; - _in = new HttpInput(new HttpChannelState(new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null) - { - @Override - public void onAsyncWaitForContent() - { - __history.add("onAsyncWaitForContent"); - } - - @Override - public Scheduler getScheduler() - { - return null; - } - }) - { - @Override - public void onReadUnready() - { - super.onReadUnready(); - __history.add("onReadUnready"); - } - - @Override - public boolean onContentAdded() - { - boolean wake = super.onContentAdded(); - __history.add("onReadPossible " + wake); - return wake; - } - - @Override - public boolean onReadReady() - { - boolean wake = super.onReadReady(); - __history.add("onReadReady " + wake); - return wake; - } - }) - { - @Override - public void wake() - { - __history.add("wake"); - } - }; - - _state = _in.getHttpChannelState(); - __history.clear(); - } - - private void check(String... history) - { - if (history == null || history.length == 0) - assertThat(__history, empty()); - else - assertThat(__history.toArray(new String[__history.size()]), Matchers.arrayContaining(history)); - __history.clear(); - } - - private void wake() - { - handle(null); - } - - private void handle() - { - handle(null); - } - - private void handle(Runnable run) - { - Action action = _state.handling(); - loop: - while (true) - { - switch (action) - { - case DISPATCH: - if (run == null) - fail("Run is null during DISPATCH"); - run.run(); - break; - - case READ_CALLBACK: - _in.run(); - break; - - case TERMINATED: - case WAIT: - break loop; - - case COMPLETE: - __history.add("COMPLETE"); - break; - - case READ_REGISTER: - _state.getHttpChannel().onAsyncWaitForContent(); - break; - - default: - fail("Bad Action: " + action); - } - action = _state.unhandle(); - } - } - - private void deliver(Content... content) - { - if (content != null) - { - for (Content c : content) - { - if (c == EOF_CONTENT) - { - _in.eof(); - _eof = true; - } - else if (c == HttpInput.EARLY_EOF_CONTENT) - { - _in.earlyEOF(); - _eof = true; - } - else - { - _in.addContent(c); - BufferUtil.append(_expected, c.getByteBuffer().slice()); - } - } - } - } - - boolean readAvailable() throws IOException - { - int len = 0; - try - { - while (_in.isReady()) - { - int b = _in.read(); - - if (b < 0) - { - if (len > 0) - __history.add("read " + len); - __history.add("read -1"); - assertTrue(BufferUtil.isEmpty(_expected)); - assertTrue(_eof); - return true; - } - else - { - len++; - assertFalse(BufferUtil.isEmpty(_expected)); - int a = 0xff & _expected.get(); - assertThat(b, equalTo(a)); - } - } - __history.add("read " + len); - assertTrue(BufferUtil.isEmpty(_expected)); - } - catch (IOException e) - { - if (len > 0) - __history.add("read " + len); - __history.add("read " + e); - throw e; - } - return false; - } - - @AfterEach - public void after() - { - assertThat(__history.poll(), Matchers.nullValue()); - } - - @Test - public void testInitialEmptyListenInHandle() throws Exception - { - deliver(EOF_CONTENT); - check(); - - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadReady false"); - }); - - check("onAllDataRead"); - } - - @Test - public void testInitialEmptyListenAfterHandle() throws Exception - { - deliver(EOF_CONTENT); - - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check("onAllDataRead"); - } - - @Test - public void testListenInHandleEmpty() throws Exception - { - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadUnready"); - }); - - check("onAsyncWaitForContent"); - - deliver(EOF_CONTENT); - check("onReadPossible true"); - handle(); - check("onAllDataRead"); - } - - @Test - public void testEmptyListenAfterHandle() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - deliver(EOF_CONTENT); - check(); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check("onAllDataRead"); - } - - @Test - public void testListenAfterHandleEmpty() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onAsyncWaitForContent", "onReadUnready"); - - deliver(EOF_CONTENT); - check("onReadPossible true"); - - handle(); - check("onAllDataRead"); - } - - @Test - public void testInitialEarlyEOFListenInHandle() throws Exception - { - deliver(EARLY_EOF_CONTENT); - check(); - - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadReady false"); - }); - - check("onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testInitialEarlyEOFListenAfterHandle() throws Exception - { - deliver(EARLY_EOF_CONTENT); - - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check("onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testListenInHandleEarlyEOF() throws Exception - { - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadUnready"); - }); - - check("onAsyncWaitForContent"); - - deliver(EARLY_EOF_CONTENT); - check("onReadPossible true"); - handle(); - check("onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testEarlyEOFListenAfterHandle() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - deliver(EARLY_EOF_CONTENT); - check(); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check("onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testListenAfterHandleEarlyEOF() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onAsyncWaitForContent", "onReadUnready"); - - deliver(EARLY_EOF_CONTENT); - check("onReadPossible true"); - - handle(); - check("onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testInitialAllContentListenInHandle() throws Exception - { - deliver(new TContent("Hello"), EOF_CONTENT); - check(); - - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadReady false"); - }); - - check("onDataAvailable", "read 5", "read -1", "onAllDataRead"); - } - - @Test - public void testInitialAllContentListenAfterHandle() throws Exception - { - deliver(new TContent("Hello"), EOF_CONTENT); - - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check("onDataAvailable", "read 5", "read -1", "onAllDataRead"); - } - - @Test - public void testListenInHandleAllContent() throws Exception - { - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadUnready"); - }); - - check("onAsyncWaitForContent"); - - deliver(new TContent("Hello"), EOF_CONTENT); - check("onReadPossible true", "onReadPossible false"); - handle(); - check("onDataAvailable", "read 5", "read -1", "onAllDataRead"); - } - - @Test - public void testAllContentListenAfterHandle() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - deliver(new TContent("Hello"), EOF_CONTENT); - check(); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check("onDataAvailable", "read 5", "read -1", "onAllDataRead"); - } - - @Test - public void testListenAfterHandleAllContent() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onAsyncWaitForContent", "onReadUnready"); - - deliver(new TContent("Hello"), EOF_CONTENT); - check("onReadPossible true", "onReadPossible false"); - - handle(); - check("onDataAvailable", "read 5", "read -1", "onAllDataRead"); - } - - @Test - public void testInitialIncompleteContentListenInHandle() throws Exception - { - deliver(new TContent("Hello"), EARLY_EOF_CONTENT); - check(); - - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadReady false"); - }); - - check( - "onDataAvailable", - "read 5", - "read org.eclipse.jetty.io.EofException: Early EOF", - "onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testInitialPartialContentListenAfterHandle() throws Exception - { - deliver(new TContent("Hello"), EARLY_EOF_CONTENT); - - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check( - "onDataAvailable", - "read 5", - "read org.eclipse.jetty.io.EofException: Early EOF", - "onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testListenInHandlePartialContent() throws Exception - { - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadUnready"); - }); - - check("onAsyncWaitForContent"); - - deliver(new TContent("Hello"), EARLY_EOF_CONTENT); - check("onReadPossible true", "onReadPossible false"); - handle(); - check( - "onDataAvailable", - "read 5", - "read org.eclipse.jetty.io.EofException: Early EOF", - "onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testPartialContentListenAfterHandle() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - deliver(new TContent("Hello"), EARLY_EOF_CONTENT); - check(); - - _in.setReadListener(_listener); - check("onReadReady true", "wake"); - wake(); - check( - "onDataAvailable", - "read 5", - "read org.eclipse.jetty.io.EofException: Early EOF", - "onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testListenAfterHandlePartialContent() throws Exception - { - handle(() -> - { - _state.startAsync(null); - check(); - }); - - _in.setReadListener(_listener); - check("onAsyncWaitForContent", "onReadUnready"); - - deliver(new TContent("Hello"), EARLY_EOF_CONTENT); - check("onReadPossible true", "onReadPossible false"); - - handle(); - check( - "onDataAvailable", - "read 5", - "read org.eclipse.jetty.io.EofException: Early EOF", - "onError:org.eclipse.jetty.io.EofException: Early EOF"); - } - - @Test - public void testReadAfterOnDataAvailable() throws Exception - { - _noReadInDataAvailable = true; - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadUnready"); - }); - - check("onAsyncWaitForContent"); - - deliver(new TContent("Hello"), EOF_CONTENT); - check("onReadPossible true", "onReadPossible false"); - - handle(); - check("onDataAvailable"); - - readAvailable(); - check("wake", "read 5", "read -1"); - wake(); - check("onAllDataRead"); - } - - @Test - public void testReadOnlyExpectedAfterOnDataAvailable() throws Exception - { - _noReadInDataAvailable = true; - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadUnready"); - }); - - check("onAsyncWaitForContent"); - - deliver(new TContent("Hello"), EOF_CONTENT); - check("onReadPossible true", "onReadPossible false"); - - handle(); - check("onDataAvailable"); - - byte[] buffer = new byte[_expected.remaining()]; - assertThat(_in.read(buffer), equalTo(buffer.length)); - assertThat(new String(buffer), equalTo(BufferUtil.toString(_expected))); - BufferUtil.clear(_expected); - check(); - - assertTrue(_in.isReady()); - check(); - - assertThat(_in.read(), equalTo(-1)); - check("wake"); - - wake(); - check("onAllDataRead"); - } - - @Test - public void testReadAndCompleteInOnDataAvailable() throws Exception - { - _completeInOnDataAvailable = true; - handle(() -> - { - _state.startAsync(null); - _in.setReadListener(_listener); - check("onReadUnready"); - }); - - check("onAsyncWaitForContent"); - - deliver(new TContent("Hello"), EOF_CONTENT); - check("onReadPossible true", "onReadPossible false"); - - handle(() -> - { - __history.add(_state.getState().toString()); - }); - System.err.println(__history); - check( - "onDataAvailable", - "read 5", - "read -1", - "complete", - "COMPLETE" - ); - } -} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java deleted file mode 100644 index 833fb70352b..00000000000 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java +++ /dev/null @@ -1,614 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 -// -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.server; - -import java.io.EOFException; -import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeoutException; -import javax.servlet.ReadListener; - -import org.eclipse.jetty.util.BufferUtil; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class HttpInputTest -{ - private final Queue _history = new LinkedBlockingQueue<>(); - private final Queue _fillAndParseSimulate = new LinkedBlockingQueue<>(); - private final ReadListener _listener = new ReadListener() - { - @Override - public void onError(Throwable t) - { - _history.add("l.onError:" + t); - } - - @Override - public void onDataAvailable() throws IOException - { - _history.add("l.onDataAvailable"); - } - - @Override - public void onAllDataRead() throws IOException - { - _history.add("l.onAllDataRead"); - } - }; - private HttpInput _in; - - public class TContent extends HttpInput.Content - { - private final String _content; - - public TContent(String content) - { - super(BufferUtil.toBuffer(content)); - _content = content; - } - - @Override - public void succeeded() - { - _history.add("Content succeeded " + _content); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - _history.add("Content failed " + _content); - super.failed(x); - } - } - - public class TestHttpInput extends HttpInput - { - public TestHttpInput(HttpChannelState state) - { - super(state); - } - - @Override - protected void produceContent() throws IOException - { - _history.add("produceContent " + _fillAndParseSimulate.size()); - - for (String s = _fillAndParseSimulate.poll(); s != null; s = _fillAndParseSimulate.poll()) - { - if ("_EOF_".equals(s)) - _in.eof(); - else - _in.addContent(new TContent(s)); - } - } - - @Override - protected void blockForContent() throws IOException - { - _history.add("blockForContent"); - super.blockForContent(); - } - } - - public class TestHttpChannelState extends HttpChannelState - { - private boolean _fakeAsyncState; - - public TestHttpChannelState(HttpChannel channel) - { - super(channel); - } - - public boolean isFakeAsyncState() - { - return _fakeAsyncState; - } - - public void setFakeAsyncState(boolean fakeAsyncState) - { - _fakeAsyncState = fakeAsyncState; - } - - @Override - public boolean isAsyncStarted() - { - if (isFakeAsyncState()) - return true; - return super.isAsyncStarted(); - } - - @Override - public void onReadUnready() - { - _history.add("s.onReadUnready"); - super.onReadUnready(); - } - - @Override - public boolean onReadPossible() - { - _history.add("s.onReadPossible"); - return super.onReadPossible(); - } - - @Override - public boolean onContentAdded() - { - _history.add("s.onDataAvailable"); - return super.onContentAdded(); - } - - @Override - public boolean onReadReady() - { - _history.add("s.onReadReady"); - return super.onReadReady(); - } - } - - @BeforeEach - public void before() - { - _in = new TestHttpInput(new TestHttpChannelState(new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null) - { - @Override - public void onAsyncWaitForContent() - { - _history.add("asyncReadInterested"); - } - }) - ); - } - - @AfterEach - public void after() - { - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testEmpty() throws Exception - { - assertThat(_in.available(), equalTo(0)); - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.isReady(), equalTo(true)); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testRead() throws Exception - { - _in.addContent(new TContent("AB")); - _in.addContent(new TContent("CD")); - _fillAndParseSimulate.offer("EF"); - _fillAndParseSimulate.offer("GH"); - assertThat(_in.available(), equalTo(2)); - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.isReady(), equalTo(true)); - - assertThat(_in.getContentConsumed(), equalTo(0L)); - assertThat(_in.read(), equalTo((int)'A')); - assertThat(_in.getContentConsumed(), equalTo(1L)); - assertThat(_in.read(), equalTo((int)'B')); - assertThat(_in.getContentConsumed(), equalTo(2L)); - - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.read(), equalTo((int)'C')); - assertThat(_in.read(), equalTo((int)'D')); - - assertThat(_history.poll(), equalTo("Content succeeded CD")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.read(), equalTo((int)'E')); - assertThat(_in.read(), equalTo((int)'F')); - - assertThat(_history.poll(), equalTo("produceContent 2")); - assertThat(_history.poll(), equalTo("Content succeeded EF")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.read(), equalTo((int)'G')); - assertThat(_in.read(), equalTo((int)'H')); - - assertThat(_history.poll(), equalTo("Content succeeded GH")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.getContentConsumed(), equalTo(8L)); - - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testBlockingRead() throws Exception - { - new Thread(() -> - { - try - { - Thread.sleep(500); - _in.addContent(new TContent("AB")); - } - catch (Throwable th) - { - th.printStackTrace(); - } - }).start(); - - assertThat(_in.read(), equalTo((int)'A')); - - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("blockForContent")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.read(), equalTo((int)'B')); - - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testReadEOF() throws Exception - { - _in.addContent(new TContent("AB")); - _in.addContent(new TContent("CD")); - _in.eof(); - - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.available(), equalTo(2)); - assertThat(_in.isFinished(), equalTo(false)); - - assertThat(_in.read(), equalTo((int)'A')); - assertThat(_in.read(), equalTo((int)'B')); - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.read(), equalTo((int)'C')); - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.read(), equalTo((int)'D')); - assertThat(_history.poll(), equalTo("Content succeeded CD")); - assertThat(_history.poll(), nullValue()); - assertThat(_in.isFinished(), equalTo(false)); - - assertThat(_in.read(), equalTo(-1)); - assertThat(_in.isFinished(), equalTo(true)); - - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testReadEarlyEOF() throws Exception - { - _in.addContent(new TContent("AB")); - _in.addContent(new TContent("CD")); - _in.earlyEOF(); - - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.available(), equalTo(2)); - assertThat(_in.isFinished(), equalTo(false)); - - assertThat(_in.read(), equalTo((int)'A')); - assertThat(_in.read(), equalTo((int)'B')); - - assertThat(_in.read(), equalTo((int)'C')); - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.read(), equalTo((int)'D')); - - assertThrows(EOFException.class, () -> _in.read()); - assertTrue(_in.isFinished()); - - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), equalTo("Content succeeded CD")); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testBlockingEOF() throws Exception - { - new Thread(() -> - { - try - { - Thread.sleep(500); - _in.eof(); - } - catch (Throwable th) - { - th.printStackTrace(); - } - }).start(); - - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.read(), equalTo(-1)); - assertThat(_in.isFinished(), equalTo(true)); - - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("blockForContent")); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testAsyncEmpty() throws Exception - { - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true); - _in.setReadListener(_listener); - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false); - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("s.onReadUnready")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(false)); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(false)); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testAsyncRead() throws Exception - { - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true); - _in.setReadListener(_listener); - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false); - - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("s.onReadUnready")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(false)); - assertThat(_history.poll(), nullValue()); - - _in.addContent(new TContent("AB")); - _fillAndParseSimulate.add("CD"); - - assertThat(_history.poll(), equalTo("s.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - _in.run(); - assertThat(_history.poll(), equalTo("l.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(true)); - assertThat(_in.read(), equalTo((int)'A')); - - assertThat(_in.isReady(), equalTo(true)); - assertThat(_in.read(), equalTo((int)'B')); - - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(true)); - assertThat(_history.poll(), equalTo("produceContent 1")); - assertThat(_history.poll(), equalTo("s.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.read(), equalTo((int)'C')); - - assertThat(_in.isReady(), equalTo(true)); - assertThat(_in.read(), equalTo((int)'D')); - assertThat(_history.poll(), equalTo("Content succeeded CD")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(false)); - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("s.onReadUnready")); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testAsyncEOF() throws Exception - { - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true); - _in.setReadListener(_listener); - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false); - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("s.onReadUnready")); - assertThat(_history.poll(), nullValue()); - - _in.eof(); - assertThat(_in.isReady(), equalTo(true)); - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_history.poll(), equalTo("s.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.read(), equalTo(-1)); - assertThat(_in.isFinished(), equalTo(true)); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testAsyncReadEOF() throws Exception - { - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true); - _in.setReadListener(_listener); - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false); - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("s.onReadUnready")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(false)); - assertThat(_history.poll(), nullValue()); - - _in.addContent(new TContent("AB")); - _fillAndParseSimulate.add("_EOF_"); - - assertThat(_history.poll(), equalTo("s.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - - _in.run(); - assertThat(_history.poll(), equalTo("l.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(true)); - assertThat(_in.read(), equalTo((int)'A')); - - assertThat(_in.isReady(), equalTo(true)); - assertThat(_in.read(), equalTo((int)'B')); - - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.isReady(), equalTo(true)); - assertThat(_history.poll(), equalTo("produceContent 1")); - assertThat(_history.poll(), equalTo("s.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isFinished(), equalTo(false)); - assertThat(_in.read(), equalTo(-1)); - assertThat(_in.isFinished(), equalTo(true)); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(true)); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testAsyncError() throws Exception - { - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true); - _in.setReadListener(_listener); - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false); - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), equalTo("s.onReadUnready")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(false)); - assertThat(_history.poll(), nullValue()); - - _in.failed(new TimeoutException()); - assertThat(_history.poll(), equalTo("s.onDataAvailable")); - assertThat(_history.poll(), nullValue()); - - _in.run(); - assertThat(_in.isFinished(), equalTo(true)); - assertThat(_history.poll(), equalTo("l.onError:java.util.concurrent.TimeoutException")); - assertThat(_history.poll(), nullValue()); - - assertThat(_in.isReady(), equalTo(true)); - - IOException e = assertThrows(IOException.class, () -> _in.read()); - assertThat(e.getCause(), instanceOf(TimeoutException.class)); - assertThat(_in.isFinished(), equalTo(true)); - - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testSetListenerWithNull() throws Exception - { - //test can't be null - assertThrows(NullPointerException.class, () -> - { - _in.setReadListener(null); - }); - } - - @Test - public void testSetListenerNotAsync() throws Exception - { - //test not async - assertThrows(IllegalStateException.class, () -> - { - _in.setReadListener(_listener); - }); - } - - @Test - public void testSetListenerAlreadySet() throws Exception - { - //set up a listener - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true); - _in.setReadListener(_listener); - //throw away any events generated by setting the listener - _history.clear(); - ((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false); - //now test that you can't set another listener - assertThrows(IllegalStateException.class, () -> - { - _in.setReadListener(_listener); - }); - } - - @Test - public void testRecycle() throws Exception - { - testAsyncRead(); - _in.recycle(); - testAsyncRead(); - _in.recycle(); - testReadEOF(); - } - - @Test - public void testConsumeAll() throws Exception - { - _in.addContent(new TContent("AB")); - _in.addContent(new TContent("CD")); - _fillAndParseSimulate.offer("EF"); - _fillAndParseSimulate.offer("GH"); - assertThat(_in.read(), equalTo((int)'A')); - - assertFalse(_in.consumeAll()); - assertThat(_in.getContentConsumed(), equalTo(8L)); - - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), equalTo("Content succeeded CD")); - assertThat(_history.poll(), equalTo("produceContent 2")); - assertThat(_history.poll(), equalTo("Content succeeded EF")); - assertThat(_history.poll(), equalTo("Content succeeded GH")); - assertThat(_history.poll(), equalTo("produceContent 0")); - assertThat(_history.poll(), nullValue()); - } - - @Test - public void testConsumeAllEOF() throws Exception - { - _in.addContent(new TContent("AB")); - _in.addContent(new TContent("CD")); - _fillAndParseSimulate.offer("EF"); - _fillAndParseSimulate.offer("GH"); - _fillAndParseSimulate.offer("_EOF_"); - assertThat(_in.read(), equalTo((int)'A')); - - assertTrue(_in.consumeAll()); - assertThat(_in.getContentConsumed(), equalTo(8L)); - - assertThat(_history.poll(), equalTo("Content succeeded AB")); - assertThat(_history.poll(), equalTo("Content succeeded CD")); - assertThat(_history.poll(), equalTo("produceContent 3")); - assertThat(_history.poll(), equalTo("Content succeeded EF")); - assertThat(_history.poll(), equalTo("Content succeeded GH")); - assertThat(_history.poll(), nullValue()); - } -}