HTTP and FCGI pass all tests (Milestone 1)

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2020-01-16 18:49:25 +01:00
parent 167eded4cf
commit 5a24c8dd17
10 changed files with 1648 additions and 2375 deletions

View File

@ -33,7 +33,10 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpInputOverFCGI;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
@ -57,6 +60,12 @@ public class HttpChannelOverFCGI extends HttpChannel
this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this);
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInputOverFCGI(state);
}
protected void header(HttpField field)
{
String name = field.getName();

View File

@ -121,7 +121,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
protected HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInput(state);
//TODO the HTTP2 impl instantiation should be in a subclass
return new HttpInputOverHTTP2(state);
}
protected HttpOutput newHttpOutput()

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.AbstractConnection;
@ -316,21 +317,20 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
/**
* Fill and parse data looking for content
*
* @return true if an {@link RequestHandler} method was called and it returned true;
* Parse and fill data, looking for content
*/
protected boolean fillAndParseForContent()
protected void parseAndFillForContent()
{
boolean handled = false;
// parseRequestBuffer() must always be called after fillRequestBuffer() otherwise this method doesn't trigger EOF/earlyEOF
// which breaks AsyncRequestReadTest.testPartialReadThenShutdown()
int filled = Integer.MAX_VALUE;
while (_parser.inContentState())
{
int filled = fillRequestBuffer();
handled = parseRequestBuffer();
boolean handled = parseRequestBuffer();
if (handled || filled <= 0 || _input.hasContent())
break;
filled = fillRequestBuffer();
}
return handled;
}
private int fillRequestBuffer()
@ -655,8 +655,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public void succeeded()
{
if (_contentBufferReferences.decrementAndGet() == 0)
int counter = _contentBufferReferences.decrementAndGet();
if (counter == 0)
releaseRequestBuffer();
// TODO: this should do something (warn? fail?) if _contentBufferReferences goes below 0
if (counter < 0)
{
LOG.warn("Content reference counting went below zero: {}", counter);
_contentBufferReferences.incrementAndGet();
}
}
@Override

View File

@ -0,0 +1,699 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import javax.servlet.ReadListener;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.AutoLock;
import static org.eclipse.jetty.server.HttpInputOverHTTP.Eof;
public class HttpInputOverFCGI extends HttpInput
{
private static final Logger LOG = Log.getLogger(HttpInputOverFCGI.class);
private final byte[] _oneByteBuffer = new byte[1];
private final HttpChannelState _channelState;
private final AutoLock _contentLock = new AutoLock();
private final Condition _contentLockCondition = _contentLock.newCondition();
private final ContentProducer _contentProducer;
private Eof _eof = Eof.NOT_YET;
private Throwable _error;
private ReadListener _readListener;
public HttpInputOverFCGI(HttpChannelState state)
{
_channelState = state;
_contentProducer = new ContentProducer();
}
/* HttpInput */
@Override
public void recycle()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("recycle");
_contentProducer.recycle();
_eof = Eof.NOT_YET;
_error = null;
_readListener = null;
}
}
@Override
public Interceptor getInterceptor()
{
try (AutoLock lock = _contentLock.lock())
{
return _contentProducer.getInterceptor();
}
}
@Override
public void setInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _contentLock.lock())
{
_contentProducer.setInterceptor(interceptor);
}
}
@Override
public void addInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _contentLock.lock())
{
_contentProducer.addInterceptor(interceptor);
}
}
@Override
public void asyncReadProduce()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("asyncReadProduce {}", _contentProducer);
_contentProducer.produceRawContent();
}
}
@Override
public boolean addContent(Content content)
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
long contentArrived = _contentProducer.addContent(content);
long requestContentLength = _channelState.getHttpChannel().getRequest().getContentLengthLong();
// return false to make the parser go on, true to make it stop
// -> tell the parser to stop adding content, unless we have read everything
boolean stopParsing = requestContentLength == -1 || contentArrived < requestContentLength;
if (isAsync())
_channelState.onContentAdded();
return stopParsing;
}
}
@Override
public boolean hasContent()
{
try (AutoLock lock = _contentLock.lock())
{
return _contentProducer.hasRawContent();
}
}
@Override
public void unblock()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("signalling blocked thread to wake up");
_contentLockCondition.signal();
}
}
@Override
public long getContentLength()
{
try (AutoLock lock = _contentLock.lock())
{
return _contentProducer.getRawContentArrived();
}
}
@Override
public boolean earlyEOF()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("received early EOF");
_eof = Eof.EARLY_EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
}
@Override
public boolean eof()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
_eof = Eof.EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
}
@Override
public boolean consumeAll()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("consume all");
_contentProducer.consumeAll();
if (_eof.isEof())
_eof = Eof.CONSUMED_EOF;
if (isFinished())
return !isError();
_eof = Eof.EARLY_EOF;
return false;
}
}
@Override
public boolean isError()
{
try (AutoLock lock = _contentLock.lock())
{
return _error != null;
}
}
@Override
public boolean isAsync()
{
try (AutoLock lock = _contentLock.lock())
{
return _readListener != null;
}
}
@Override
public boolean onIdleTimeout(Throwable x)
{
//TODO implement me!
return false;
}
@Override
public boolean failed(Throwable x)
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
if (_error != null)
_error.addSuppressed(x);
else
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
try (AutoLock lock = _contentLock.lock())
{
boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed();
if (LOG.isDebugEnabled())
LOG.debug("isFinished? {}", finished);
return finished;
}
}
@Override
public boolean isReady()
{
try (AutoLock lock = _contentLock.lock())
{
// calling _contentProducer.hasTransformedContent() might change the _eof state, so the following test order matters
if (_contentProducer.hasTransformedContent() || _eof.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("isReady? true");
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("isReady? false");
_channelState.onReadUnready();
return false;
}
}
@Override
public void setReadListener(ReadListener readListener)
{
try (AutoLock lock = _contentLock.lock())
{
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
if (LOG.isDebugEnabled())
LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer);
boolean woken;
if (isError())
{
woken = _channelState.onReadReady();
}
else
{
if (_contentProducer.hasTransformedContent())
{
woken = _channelState.onReadReady();
}
else if (_eof.isEof())
{
woken = _channelState.onReadEof();
}
else
{
_channelState.onReadUnready();
woken = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("setReadListener woken=" + woken);
if (woken)
scheduleReadListenerNotification();
}
}
private void scheduleReadListenerNotification()
{
HttpChannel channel = _channelState.getHttpChannel();
channel.execute(channel);
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
try (AutoLock lock = _contentLock.lock())
{
while (true)
{
int read = _contentProducer.read(b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s)", read);
if (read > 0)
return read;
if (LOG.isDebugEnabled())
LOG.debug("read error = " + _error);
if (_error != null)
throw new IOException(_error);
if (LOG.isDebugEnabled())
LOG.debug("read EOF = {}", _eof);
if (_eof.isEarly())
throw new EofException("Early EOF");
if (LOG.isDebugEnabled())
LOG.debug("read async = {}", isAsync());
if (!isAsync())
{
if (_eof.isEof())
{
_eof = Eof.CONSUMED_EOF;
if (LOG.isDebugEnabled())
LOG.debug("read on EOF, switching to CONSUMED_EOF and returning");
return -1;
}
if (LOG.isDebugEnabled())
LOG.debug("read blocked");
blockForContent();
if (LOG.isDebugEnabled())
LOG.debug("read unblocked");
}
else
{
if (_eof.isEof())
{
boolean wasInAsyncWait = _channelState.onReadEof();
if (wasInAsyncWait)
scheduleReadListenerNotification();
if (LOG.isDebugEnabled())
LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait);
_eof = Eof.CONSUMED_EOF;
return -1;
}
else
{
//TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead?
_channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested
return 0;
}
}
}
}
}
@Override
public int available()
{
try (AutoLock lock = _contentLock.lock())
{
int available = _contentProducer.available();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
return available;
}
}
private void blockForContent()
{
try
{
_channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested
if (LOG.isDebugEnabled())
LOG.debug("waiting for signal to wake up");
_contentLockCondition.await();
if (LOG.isDebugEnabled())
LOG.debug("signalled to wake up");
}
catch (Throwable x)
{
_channelState.getHttpChannel().onBlockWaitForContentFailure(x);
}
}
/* Runnable */
/*
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("running");
if (!_contentProducer.hasRawContent())
{
if (_error != null || _eof.isEarly())
{
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
if (_error != null)
_readListener.onError(_error);
else
_readListener.onError(new EofException("Early EOF"));
}
else if (_eof.isEof())
{
try
{
_readListener.onAllDataRead();
}
catch (Throwable x)
{
_readListener.onError(x);
}
}
// else: !hasContent() && !error && !EOF -> no-op
}
else
{
try
{
_readListener.onDataAvailable();
}
catch (Throwable x)
{
_readListener.onError(x);
}
}
}
}
private static class ContentProducer
{
// Note: _rawContent can never be null for as long as _transformedContent is not null.
private final Queue<Content> _rawContentQueue = new LinkedBlockingQueue<>();
private HttpInput.Content _currentRawContent;
private HttpInput.Content _transformedContent;
private long _rawContentArrived;
private HttpInput.Interceptor _interceptor;
void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this);
if (_transformedContent == _currentRawContent)
_transformedContent = null;
if (_transformedContent != null && !_transformedContent.isEmpty())
_transformedContent.failed(null);
_transformedContent = null;
if (_currentRawContent != null && !_currentRawContent.isEmpty())
_currentRawContent.failed(null);
_currentRawContent = null;
_rawContentQueue.clear();
_rawContentArrived = 0L;
if (_interceptor instanceof Destroyable)
((Destroyable)_interceptor).destroy();
_interceptor = null;
}
int available()
{
if (_currentRawContent == null)
produceRawContent();
return _currentRawContent == null ? 0 : _currentRawContent.remaining();
}
long getRawContentArrived()
{
return _rawContentArrived;
}
boolean hasRawContent()
{
return _currentRawContent != null;
}
boolean hasTransformedContent()
{
if (_transformedContent != null)
return true;
if (_currentRawContent == null)
produceRawContent();
produceTransformedContent();
return _transformedContent != null;
}
HttpInput.Interceptor getInterceptor()
{
return _interceptor;
}
void setInterceptor(HttpInput.Interceptor interceptor)
{
this._interceptor = interceptor;
}
void addInterceptor(HttpInput.Interceptor interceptor)
{
if (_interceptor == null)
_interceptor = interceptor;
else
_interceptor = new HttpInputOverHTTP.ChainedInterceptor(_interceptor, interceptor);
}
long addContent(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);
if (content == null)
throw new AssertionError("Cannot add null content");
_rawContentQueue.offer(content);
_rawContentArrived += content.remaining();
return _rawContentArrived;
}
void consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeAll", this);
// start by depleting the current _transformedContent
if (_transformedContent != null)
{
_transformedContent.skip(_transformedContent.remaining());
if (_transformedContent != _currentRawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
// don't bother transforming content, directly deplete the raw one
while (true)
{
if (_currentRawContent == null)
produceRawContent();
if (_currentRawContent == null)
break;
_currentRawContent.skip(_currentRawContent.remaining());
_currentRawContent.succeeded();
_currentRawContent = null;
}
}
int read(byte[] b, int off, int len)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read", this);
while (_transformedContent == null)
{
if (_currentRawContent == null)
{
produceRawContent();
if (_currentRawContent == null)
return 0;
}
produceTransformedContent();
}
int read = _transformedContent.get(b, off, len);
if (_transformedContent.isEmpty())
produceTransformedContent();
return read;
}
/**
* Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer
* if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)}
* is called.
*/
void produceRawContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceRawContent", this);
_currentRawContent = _rawContentQueue.poll();
}
/**
* Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent},
* or store null in {@code _transformedContent} if there is no content to work with.
* Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}.
*/
private void produceTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceTransformedContent", this);
if (_interceptor == null)
{
// no interceptor set
if (_currentRawContent != null && _currentRawContent.isEmpty())
{
_currentRawContent.succeeded();
_currentRawContent = null;
_transformedContent = null;
}
else
{
_transformedContent = _currentRawContent;
}
}
else
{
// interceptor set
transformContent();
if (_transformedContent == null)
{
if (_currentRawContent != null && _currentRawContent.isEmpty())
{
_currentRawContent.succeeded();
_currentRawContent = null;
}
else
{
_transformedContent = _currentRawContent;
}
}
}
}
/**
* Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor.
* The produced content is guaranteed to either be null or not empty.
*/
private void transformContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} transformContent", this);
if (_currentRawContent == null)
return;
_transformedContent = _interceptor.readFrom(_currentRawContent);
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _currentRawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived +
",r=" + _currentRawContent + ",t=" + _transformedContent + "]";
}
}
}

View File

@ -19,17 +19,719 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.Objects;
import javax.servlet.ReadListener;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
// tests used: RequestTest, PartialRFC2616Test, AsyncRequestReadTest, AsyncIOServletTest, GzipHandlerTest
public class HttpInputOverHTTP extends HttpInput
{
private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class);
private final byte[] _oneByteBuffer = new byte[1];
private final HttpChannelState _channelState;
private final NotifyingSemaphore _semaphore = new NotifyingSemaphore();
// TODO: think about thread visibility of the below variables
private final ContentProducer _contentProducer;
private Eof _eof = Eof.NOT_YET;
private Throwable _error;
private ReadListener _readListener;
public HttpInputOverHTTP(HttpChannelState state)
{
super(state);
_channelState = state;
_contentProducer = new ContentProducer(() -> ((HttpConnection)state.getHttpChannel().getEndPoint().getConnection()).parseAndFillForContent());
}
/* HttpInput */
@Override
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle");
_contentProducer.recycle();
_eof = Eof.NOT_YET;
_error = null;
_readListener = null;
}
@Override
protected void produceContent() throws IOException
public Interceptor getInterceptor()
{
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent();
return _contentProducer.getInterceptor();
}
@Override
public void setInterceptor(Interceptor interceptor)
{
_contentProducer.setInterceptor(interceptor);
}
@Override
public void addInterceptor(Interceptor interceptor)
{
_contentProducer.addInterceptor(interceptor);
}
@Override
public void asyncReadProduce()
{
if (LOG.isDebugEnabled())
LOG.debug("asyncReadProduce {}", _contentProducer);
_contentProducer.produceRawContent();
}
@Override
public boolean addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
long contentArrived = _contentProducer.addContent(content);
long requestContentLength = _channelState.getHttpChannel().getRequest().getContentLengthLong();
// return false to make the parser go on, true to make it stop
// -> tell the parser to stop adding content, unless we have read everything
boolean stopParsing = requestContentLength == -1 || contentArrived < requestContentLength;
if (isAsync())
_channelState.onContentAdded();
return stopParsing;
}
@Override
public boolean hasContent()
{
return _contentProducer.hasRawContent();
}
@Override
public void unblock()
{
if (LOG.isDebugEnabled())
LOG.debug("signalling blocked thread to wake up");
_semaphore.release();
}
@Override
public long getContentLength()
{
return _contentProducer.getRawContentArrived();
}
@Override
public boolean earlyEOF()
{
if (LOG.isDebugEnabled())
LOG.debug("received early EOF");
_eof = Eof.EARLY_EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
_eof = Eof.EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("consume all");
_contentProducer.consumeAll();
if (_eof.isEof())
_eof = Eof.CONSUMED_EOF;
if (isFinished())
return !isError();
_eof = Eof.EARLY_EOF;
return false;
}
@Override
public boolean isError()
{
return _error != null;
}
@Override
public boolean isAsync()
{
return _readListener != null;
}
@Override
public boolean onIdleTimeout(Throwable x)
{
//TODO implement me!
return false;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
if (_error != null)
_error.addSuppressed(x);
else
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed();
if (LOG.isDebugEnabled())
LOG.debug("isFinished? {}", finished);
return finished;
}
@Override
public boolean isReady()
{
// calling _contentProducer.hasTransformedContent() might change the _eof state, so the following test order matters
if (_contentProducer.hasTransformedContent() || _eof.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("isReady? true");
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("isReady? false");
_channelState.onReadUnready();
return false;
}
@Override
public void setReadListener(ReadListener readListener)
{
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
if (LOG.isDebugEnabled())
LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer);
boolean woken;
if (isError())
{
woken = _channelState.onReadReady();
}
else
{
if (_contentProducer.hasTransformedContent())
{
woken = _channelState.onReadReady();
}
else if (_eof.isEof())
{
woken = _channelState.onReadEof();
}
else
{
_channelState.onReadUnready();
woken = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("setReadListener woken=" + woken);
if (woken)
scheduleReadListenerNotification();
}
private void scheduleReadListenerNotification()
{
HttpChannel channel = _channelState.getHttpChannel();
channel.execute(channel);
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
while (true)
{
int read = _contentProducer.read(b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s)", read);
if (read > 0)
return read;
if (LOG.isDebugEnabled())
LOG.debug("read error = " + _error);
if (_error != null)
throw new IOException(_error);
if (LOG.isDebugEnabled())
LOG.debug("read EOF = {}", _eof);
if (_eof.isEarly())
throw new EofException("Early EOF");
if (LOG.isDebugEnabled())
LOG.debug("read async = {}", isAsync());
if (!isAsync())
{
if (_eof.isEof())
{
_eof = Eof.CONSUMED_EOF;
if (LOG.isDebugEnabled())
LOG.debug("read on EOF, switching to CONSUMED_EOF and returning");
return -1;
}
if (LOG.isDebugEnabled())
LOG.debug("read blocked");
blockForContent();
if (LOG.isDebugEnabled())
LOG.debug("read unblocked");
}
else
{
if (_eof.isEof())
{
boolean wasInAsyncWait = _channelState.onReadEof();
if (wasInAsyncWait)
scheduleReadListenerNotification();
if (LOG.isDebugEnabled())
LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait);
_eof = Eof.CONSUMED_EOF;
return -1;
}
else
{
//TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead?
_channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested
return 0;
}
}
}
}
@Override
public int available()
{
int available = _contentProducer.available();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
return available;
}
private void blockForContent()
{
try
{
_semaphore.acquire(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("waiting for signal to wake up");
_channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested if it blocks
});
}
catch (Throwable x)
{
_channelState.getHttpChannel().onBlockWaitForContentFailure(x);
}
}
/* Runnable */
/*
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("running");
if (!_contentProducer.hasRawContent())
{
if (_error != null || _eof.isEarly())
{
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
if (_error != null)
_readListener.onError(_error);
else
_readListener.onError(new EofException("Early EOF"));
}
else if (_eof.isEof())
{
try
{
_readListener.onAllDataRead();
}
catch (Throwable x)
{
_readListener.onError(x);
}
}
// else: !hasContent() && !error && !EOF -> no-op
}
else
{
try
{
_readListener.onDataAvailable();
}
catch (Throwable x)
{
_readListener.onError(x);
}
}
}
/**
* An {@link Interceptor} that chains two other {@link Interceptor}s together.
* The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s
* {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned
* to the next {@link Interceptor}.
*/
static class ChainedInterceptor implements Interceptor, Destroyable
{
private final Interceptor _prev;
private final Interceptor _next;
public ChainedInterceptor(Interceptor prev, Interceptor next)
{
_prev = prev;
_next = next;
}
public Interceptor getPrev()
{
return _prev;
}
public Interceptor getNext()
{
return _next;
}
@Override
public Content readFrom(Content content)
{
return getNext().readFrom(getPrev().readFrom(content));
}
@Override
public void destroy()
{
if (_prev instanceof Destroyable)
((Destroyable)_prev).destroy();
if (_next instanceof Destroyable)
((Destroyable)_next).destroy();
}
}
enum Eof
{
NOT_YET(false, false, false),
EOF(true, false, false),
CONSUMED_EOF(true, true, false),
EARLY_EOF(true, false, true),
;
private final boolean _eof;
private final boolean _consumed;
private final boolean _early;
Eof(boolean eof, boolean consumed, boolean early)
{
_eof = eof;
_consumed = consumed;
_early = early;
}
public boolean isEof()
{
return _eof;
}
public boolean isConsumed()
{
return _consumed;
}
public boolean isEarly()
{
return _early;
}
}
private static class ContentProducer
{
private final Runnable _rawContentProducer;
// TODO: think about thread visibility of the below variables
// Note: _rawContent can never be null for as long as _transformedContent is not null.
private HttpInput.Content _rawContent;
private HttpInput.Content _transformedContent;
private long _rawContentArrived;
private HttpInput.Interceptor _interceptor;
public ContentProducer(Runnable rawContentProducer)
{
_rawContentProducer = rawContentProducer;
}
void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this);
if (_transformedContent == _rawContent)
_transformedContent = null;
if (_transformedContent != null && !_transformedContent.isEmpty())
_transformedContent.failed(null);
_transformedContent = null;
if (_rawContent != null && !_rawContent.isEmpty())
_rawContent.failed(null);
_rawContent = null;
_rawContentArrived = 0L;
if (_interceptor instanceof Destroyable)
((Destroyable)_interceptor).destroy();
_interceptor = null;
}
int available()
{
if (_rawContent == null)
produceRawContent();
return _rawContent == null ? 0 : _rawContent.remaining();
}
long getRawContentArrived()
{
return _rawContentArrived;
}
boolean hasRawContent()
{
return _rawContent != null;
}
boolean hasTransformedContent()
{
if (_transformedContent != null)
return true;
if (_rawContent == null)
produceRawContent();
produceTransformedContent();
return _transformedContent != null;
}
HttpInput.Interceptor getInterceptor()
{
return _interceptor;
}
void setInterceptor(HttpInput.Interceptor interceptor)
{
this._interceptor = interceptor;
}
void addInterceptor(HttpInput.Interceptor interceptor)
{
if (_interceptor == null)
_interceptor = interceptor;
else
_interceptor = new HttpInputOverHTTP.ChainedInterceptor(_interceptor, interceptor);
}
long addContent(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);
if (content == null)
throw new AssertionError("Cannot add null content");
if (_rawContent != null)
throw new AssertionError("Cannot add new content while current one hasn't been processed");
_rawContent = content;
_rawContentArrived += content.remaining();
return _rawContentArrived;
}
void consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeAll", this);
// start by depleting the current _transformedContent
if (_transformedContent != null)
{
_transformedContent.skip(_transformedContent.remaining());
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
// don't bother transforming content, directly deplete the raw one
while (true)
{
if (_rawContent == null)
produceRawContent();
if (_rawContent == null)
break;
_rawContent.skip(_rawContent.remaining());
_rawContent.succeeded();
_rawContent = null;
}
}
int read(byte[] b, int off, int len)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read", this);
while (_transformedContent == null)
{
if (_rawContent == null)
{
produceRawContent();
if (_rawContent == null)
return 0;
}
produceTransformedContent();
}
int read = _transformedContent.get(b, off, len);
if (_transformedContent.isEmpty())
produceTransformedContent();
return read;
}
/**
* Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer
* if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)}
* is called.
*/
void produceRawContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceRawContent", this);
_rawContentProducer.run();
}
/**
* Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent},
* or store null in {@code _transformedContent} if there is no content to work with.
* Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}.
*/
private void produceTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceTransformedContent", this);
if (_interceptor == null)
{
// no interceptor set
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
_transformedContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
else
{
// interceptor set
transformContent();
if (_transformedContent == null)
{
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
}
}
/**
* Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor.
* The produced content is guaranteed to either be null or not empty.
*/
private void transformContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} transformContent", this);
if (_rawContent == null)
return;
_transformedContent = _interceptor.readFrom(_rawContent);
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived +
",r=" + _rawContent + ",t=" + _transformedContent + "]";
}
}
private static class NotifyingSemaphore
{
private int permits;
public synchronized void acquire(Runnable onBlocking) throws InterruptedException
{
if (permits == 0)
onBlocking.run();
while (permits == 0)
wait();
permits--;
}
public synchronized void release()
{
permits++;
if (permits == 1)
notify();
}
}
}

View File

@ -0,0 +1,180 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import javax.servlet.ReadListener;
public class HttpInputOverHTTP2 extends HttpInput
{
private final byte[] _oneByteBuffer = new byte[1];
private final HttpChannelState _channelState;
public HttpInputOverHTTP2(HttpChannelState state)
{
_channelState = state;
}
/* HttpInput */
@Override
public void recycle()
{
}
@Override
public Interceptor getInterceptor()
{
return null;
}
@Override
public void setInterceptor(Interceptor interceptor)
{
}
@Override
public void addInterceptor(Interceptor interceptor)
{
}
@Override
public void asyncReadProduce() throws IOException
{
}
@Override
public boolean addContent(Content content)
{
return false;
}
@Override
public boolean hasContent()
{
return false;
}
@Override
public void unblock()
{
}
@Override
public long getContentLength()
{
return 0;
}
@Override
public boolean earlyEOF()
{
return false;
}
@Override
public boolean eof()
{
return false;
}
@Override
public boolean consumeAll()
{
return false;
}
@Override
public boolean isError()
{
return false;
}
@Override
public boolean isAsync()
{
return false;
}
@Override
public boolean onIdleTimeout(Throwable x)
{
return false;
}
@Override
public boolean failed(Throwable x)
{
return false;
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
return false;
}
@Override
public boolean isReady()
{
return false;
}
@Override
public void setReadListener(ReadListener readListener)
{
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
return 0;
}
@Override
public int available() throws IOException
{
return 0;
}
/* Runnable */
@Override
public void run()
{
}
}

View File

@ -749,7 +749,7 @@ public class Request implements HttpServletRequest
public long getContentRead()
{
return _input.getContentConsumed();
return _input.getContentLength();
}
@Override

View File

@ -1,735 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.ReadListener;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT;
import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* this tests HttpInput and its interaction with HttpChannelState
*/
public class HttpInputAsyncStateTest
{
private static final Queue<String> __history = new LinkedBlockingQueue<>();
private ByteBuffer _expected = BufferUtil.allocate(16 * 1024);
private boolean _eof;
private boolean _noReadInDataAvailable;
private boolean _completeInOnDataAvailable;
private final ReadListener _listener = new ReadListener()
{
@Override
public void onError(Throwable t)
{
__history.add("onError:" + t);
}
@Override
public void onDataAvailable() throws IOException
{
__history.add("onDataAvailable");
if (!_noReadInDataAvailable && readAvailable() && _completeInOnDataAvailable)
{
__history.add("complete");
_state.complete();
}
}
@Override
public void onAllDataRead() throws IOException
{
__history.add("onAllDataRead");
}
};
private HttpInput _in;
HttpChannelState _state;
public static class TContent extends HttpInput.Content
{
public TContent(String content)
{
super(BufferUtil.toBuffer(content));
}
}
@BeforeEach
public void before()
{
_noReadInDataAvailable = false;
_in = new HttpInput(new HttpChannelState(new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null)
{
@Override
public void onAsyncWaitForContent()
{
__history.add("onAsyncWaitForContent");
}
@Override
public Scheduler getScheduler()
{
return null;
}
})
{
@Override
public void onReadUnready()
{
super.onReadUnready();
__history.add("onReadUnready");
}
@Override
public boolean onContentAdded()
{
boolean wake = super.onContentAdded();
__history.add("onReadPossible " + wake);
return wake;
}
@Override
public boolean onReadReady()
{
boolean wake = super.onReadReady();
__history.add("onReadReady " + wake);
return wake;
}
})
{
@Override
public void wake()
{
__history.add("wake");
}
};
_state = _in.getHttpChannelState();
__history.clear();
}
private void check(String... history)
{
if (history == null || history.length == 0)
assertThat(__history, empty());
else
assertThat(__history.toArray(new String[__history.size()]), Matchers.arrayContaining(history));
__history.clear();
}
private void wake()
{
handle(null);
}
private void handle()
{
handle(null);
}
private void handle(Runnable run)
{
Action action = _state.handling();
loop:
while (true)
{
switch (action)
{
case DISPATCH:
if (run == null)
fail("Run is null during DISPATCH");
run.run();
break;
case READ_CALLBACK:
_in.run();
break;
case TERMINATED:
case WAIT:
break loop;
case COMPLETE:
__history.add("COMPLETE");
break;
case READ_REGISTER:
_state.getHttpChannel().onAsyncWaitForContent();
break;
default:
fail("Bad Action: " + action);
}
action = _state.unhandle();
}
}
private void deliver(Content... content)
{
if (content != null)
{
for (Content c : content)
{
if (c == EOF_CONTENT)
{
_in.eof();
_eof = true;
}
else if (c == HttpInput.EARLY_EOF_CONTENT)
{
_in.earlyEOF();
_eof = true;
}
else
{
_in.addContent(c);
BufferUtil.append(_expected, c.getByteBuffer().slice());
}
}
}
}
boolean readAvailable() throws IOException
{
int len = 0;
try
{
while (_in.isReady())
{
int b = _in.read();
if (b < 0)
{
if (len > 0)
__history.add("read " + len);
__history.add("read -1");
assertTrue(BufferUtil.isEmpty(_expected));
assertTrue(_eof);
return true;
}
else
{
len++;
assertFalse(BufferUtil.isEmpty(_expected));
int a = 0xff & _expected.get();
assertThat(b, equalTo(a));
}
}
__history.add("read " + len);
assertTrue(BufferUtil.isEmpty(_expected));
}
catch (IOException e)
{
if (len > 0)
__history.add("read " + len);
__history.add("read " + e);
throw e;
}
return false;
}
@AfterEach
public void after()
{
assertThat(__history.poll(), Matchers.nullValue());
}
@Test
public void testInitialEmptyListenInHandle() throws Exception
{
deliver(EOF_CONTENT);
check();
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check("onAllDataRead");
}
@Test
public void testInitialEmptyListenAfterHandle() throws Exception
{
deliver(EOF_CONTENT);
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check("onAllDataRead");
}
@Test
public void testListenInHandleEmpty() throws Exception
{
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("onAsyncWaitForContent");
deliver(EOF_CONTENT);
check("onReadPossible true");
handle();
check("onAllDataRead");
}
@Test
public void testEmptyListenAfterHandle() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
deliver(EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check("onAllDataRead");
}
@Test
public void testListenAfterHandleEmpty() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onAsyncWaitForContent", "onReadUnready");
deliver(EOF_CONTENT);
check("onReadPossible true");
handle();
check("onAllDataRead");
}
@Test
public void testInitialEarlyEOFListenInHandle() throws Exception
{
deliver(EARLY_EOF_CONTENT);
check();
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testInitialEarlyEOFListenAfterHandle() throws Exception
{
deliver(EARLY_EOF_CONTENT);
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenInHandleEarlyEOF() throws Exception
{
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("onAsyncWaitForContent");
deliver(EARLY_EOF_CONTENT);
check("onReadPossible true");
handle();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testEarlyEOFListenAfterHandle() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
deliver(EARLY_EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenAfterHandleEarlyEOF() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onAsyncWaitForContent", "onReadUnready");
deliver(EARLY_EOF_CONTENT);
check("onReadPossible true");
handle();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testInitialAllContentListenInHandle() throws Exception
{
deliver(new TContent("Hello"), EOF_CONTENT);
check();
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check("onDataAvailable", "read 5", "read -1", "onAllDataRead");
}
@Test
public void testInitialAllContentListenAfterHandle() throws Exception
{
deliver(new TContent("Hello"), EOF_CONTENT);
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check("onDataAvailable", "read 5", "read -1", "onAllDataRead");
}
@Test
public void testListenInHandleAllContent() throws Exception
{
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("onAsyncWaitForContent");
deliver(new TContent("Hello"), EOF_CONTENT);
check("onReadPossible true", "onReadPossible false");
handle();
check("onDataAvailable", "read 5", "read -1", "onAllDataRead");
}
@Test
public void testAllContentListenAfterHandle() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
deliver(new TContent("Hello"), EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check("onDataAvailable", "read 5", "read -1", "onAllDataRead");
}
@Test
public void testListenAfterHandleAllContent() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onAsyncWaitForContent", "onReadUnready");
deliver(new TContent("Hello"), EOF_CONTENT);
check("onReadPossible true", "onReadPossible false");
handle();
check("onDataAvailable", "read 5", "read -1", "onAllDataRead");
}
@Test
public void testInitialIncompleteContentListenInHandle() throws Exception
{
deliver(new TContent("Hello"), EARLY_EOF_CONTENT);
check();
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testInitialPartialContentListenAfterHandle() throws Exception
{
deliver(new TContent("Hello"), EARLY_EOF_CONTENT);
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenInHandlePartialContent() throws Exception
{
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("onAsyncWaitForContent");
deliver(new TContent("Hello"), EARLY_EOF_CONTENT);
check("onReadPossible true", "onReadPossible false");
handle();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testPartialContentListenAfterHandle() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
deliver(new TContent("Hello"), EARLY_EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true", "wake");
wake();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenAfterHandlePartialContent() throws Exception
{
handle(() ->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onAsyncWaitForContent", "onReadUnready");
deliver(new TContent("Hello"), EARLY_EOF_CONTENT);
check("onReadPossible true", "onReadPossible false");
handle();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testReadAfterOnDataAvailable() throws Exception
{
_noReadInDataAvailable = true;
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("onAsyncWaitForContent");
deliver(new TContent("Hello"), EOF_CONTENT);
check("onReadPossible true", "onReadPossible false");
handle();
check("onDataAvailable");
readAvailable();
check("wake", "read 5", "read -1");
wake();
check("onAllDataRead");
}
@Test
public void testReadOnlyExpectedAfterOnDataAvailable() throws Exception
{
_noReadInDataAvailable = true;
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("onAsyncWaitForContent");
deliver(new TContent("Hello"), EOF_CONTENT);
check("onReadPossible true", "onReadPossible false");
handle();
check("onDataAvailable");
byte[] buffer = new byte[_expected.remaining()];
assertThat(_in.read(buffer), equalTo(buffer.length));
assertThat(new String(buffer), equalTo(BufferUtil.toString(_expected)));
BufferUtil.clear(_expected);
check();
assertTrue(_in.isReady());
check();
assertThat(_in.read(), equalTo(-1));
check("wake");
wake();
check("onAllDataRead");
}
@Test
public void testReadAndCompleteInOnDataAvailable() throws Exception
{
_completeInOnDataAvailable = true;
handle(() ->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("onAsyncWaitForContent");
deliver(new TContent("Hello"), EOF_CONTENT);
check("onReadPossible true", "onReadPossible false");
handle(() ->
{
__history.add(_state.getState().toString());
});
System.err.println(__history);
check(
"onDataAvailable",
"read 5",
"read -1",
"complete",
"COMPLETE"
);
}
}

View File

@ -1,614 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.EOFException;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import javax.servlet.ReadListener;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpInputTest
{
private final Queue<String> _history = new LinkedBlockingQueue<>();
private final Queue<String> _fillAndParseSimulate = new LinkedBlockingQueue<>();
private final ReadListener _listener = new ReadListener()
{
@Override
public void onError(Throwable t)
{
_history.add("l.onError:" + t);
}
@Override
public void onDataAvailable() throws IOException
{
_history.add("l.onDataAvailable");
}
@Override
public void onAllDataRead() throws IOException
{
_history.add("l.onAllDataRead");
}
};
private HttpInput _in;
public class TContent extends HttpInput.Content
{
private final String _content;
public TContent(String content)
{
super(BufferUtil.toBuffer(content));
_content = content;
}
@Override
public void succeeded()
{
_history.add("Content succeeded " + _content);
super.succeeded();
}
@Override
public void failed(Throwable x)
{
_history.add("Content failed " + _content);
super.failed(x);
}
}
public class TestHttpInput extends HttpInput
{
public TestHttpInput(HttpChannelState state)
{
super(state);
}
@Override
protected void produceContent() throws IOException
{
_history.add("produceContent " + _fillAndParseSimulate.size());
for (String s = _fillAndParseSimulate.poll(); s != null; s = _fillAndParseSimulate.poll())
{
if ("_EOF_".equals(s))
_in.eof();
else
_in.addContent(new TContent(s));
}
}
@Override
protected void blockForContent() throws IOException
{
_history.add("blockForContent");
super.blockForContent();
}
}
public class TestHttpChannelState extends HttpChannelState
{
private boolean _fakeAsyncState;
public TestHttpChannelState(HttpChannel channel)
{
super(channel);
}
public boolean isFakeAsyncState()
{
return _fakeAsyncState;
}
public void setFakeAsyncState(boolean fakeAsyncState)
{
_fakeAsyncState = fakeAsyncState;
}
@Override
public boolean isAsyncStarted()
{
if (isFakeAsyncState())
return true;
return super.isAsyncStarted();
}
@Override
public void onReadUnready()
{
_history.add("s.onReadUnready");
super.onReadUnready();
}
@Override
public boolean onReadPossible()
{
_history.add("s.onReadPossible");
return super.onReadPossible();
}
@Override
public boolean onContentAdded()
{
_history.add("s.onDataAvailable");
return super.onContentAdded();
}
@Override
public boolean onReadReady()
{
_history.add("s.onReadReady");
return super.onReadReady();
}
}
@BeforeEach
public void before()
{
_in = new TestHttpInput(new TestHttpChannelState(new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null)
{
@Override
public void onAsyncWaitForContent()
{
_history.add("asyncReadInterested");
}
})
);
}
@AfterEach
public void after()
{
assertThat(_history.poll(), nullValue());
}
@Test
public void testEmpty() throws Exception
{
assertThat(_in.available(), equalTo(0));
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.isReady(), equalTo(true));
assertThat(_history.poll(), nullValue());
}
@Test
public void testRead() throws Exception
{
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
assertThat(_in.available(), equalTo(2));
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.isReady(), equalTo(true));
assertThat(_in.getContentConsumed(), equalTo(0L));
assertThat(_in.read(), equalTo((int)'A'));
assertThat(_in.getContentConsumed(), equalTo(1L));
assertThat(_in.read(), equalTo((int)'B'));
assertThat(_in.getContentConsumed(), equalTo(2L));
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), nullValue());
assertThat(_in.read(), equalTo((int)'C'));
assertThat(_in.read(), equalTo((int)'D'));
assertThat(_history.poll(), equalTo("Content succeeded CD"));
assertThat(_history.poll(), nullValue());
assertThat(_in.read(), equalTo((int)'E'));
assertThat(_in.read(), equalTo((int)'F'));
assertThat(_history.poll(), equalTo("produceContent 2"));
assertThat(_history.poll(), equalTo("Content succeeded EF"));
assertThat(_history.poll(), nullValue());
assertThat(_in.read(), equalTo((int)'G'));
assertThat(_in.read(), equalTo((int)'H'));
assertThat(_history.poll(), equalTo("Content succeeded GH"));
assertThat(_history.poll(), nullValue());
assertThat(_in.getContentConsumed(), equalTo(8L));
assertThat(_history.poll(), nullValue());
}
@Test
public void testBlockingRead() throws Exception
{
new Thread(() ->
{
try
{
Thread.sleep(500);
_in.addContent(new TContent("AB"));
}
catch (Throwable th)
{
th.printStackTrace();
}
}).start();
assertThat(_in.read(), equalTo((int)'A'));
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("blockForContent"));
assertThat(_history.poll(), nullValue());
assertThat(_in.read(), equalTo((int)'B'));
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), nullValue());
}
@Test
public void testReadEOF() throws Exception
{
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_in.eof();
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.available(), equalTo(2));
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.read(), equalTo((int)'A'));
assertThat(_in.read(), equalTo((int)'B'));
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), nullValue());
assertThat(_in.read(), equalTo((int)'C'));
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.read(), equalTo((int)'D'));
assertThat(_history.poll(), equalTo("Content succeeded CD"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.read(), equalTo(-1));
assertThat(_in.isFinished(), equalTo(true));
assertThat(_history.poll(), nullValue());
}
@Test
public void testReadEarlyEOF() throws Exception
{
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_in.earlyEOF();
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.available(), equalTo(2));
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.read(), equalTo((int)'A'));
assertThat(_in.read(), equalTo((int)'B'));
assertThat(_in.read(), equalTo((int)'C'));
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.read(), equalTo((int)'D'));
assertThrows(EOFException.class, () -> _in.read());
assertTrue(_in.isFinished());
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), equalTo("Content succeeded CD"));
assertThat(_history.poll(), nullValue());
}
@Test
public void testBlockingEOF() throws Exception
{
new Thread(() ->
{
try
{
Thread.sleep(500);
_in.eof();
}
catch (Throwable th)
{
th.printStackTrace();
}
}).start();
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.read(), equalTo(-1));
assertThat(_in.isFinished(), equalTo(true));
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("blockForContent"));
assertThat(_history.poll(), nullValue());
}
@Test
public void testAsyncEmpty() throws Exception
{
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true);
_in.setReadListener(_listener);
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false);
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("s.onReadUnready"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(false));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(false));
assertThat(_history.poll(), nullValue());
}
@Test
public void testAsyncRead() throws Exception
{
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true);
_in.setReadListener(_listener);
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false);
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("s.onReadUnready"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(false));
assertThat(_history.poll(), nullValue());
_in.addContent(new TContent("AB"));
_fillAndParseSimulate.add("CD");
assertThat(_history.poll(), equalTo("s.onDataAvailable"));
assertThat(_history.poll(), nullValue());
_in.run();
assertThat(_history.poll(), equalTo("l.onDataAvailable"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(true));
assertThat(_in.read(), equalTo((int)'A'));
assertThat(_in.isReady(), equalTo(true));
assertThat(_in.read(), equalTo((int)'B'));
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(true));
assertThat(_history.poll(), equalTo("produceContent 1"));
assertThat(_history.poll(), equalTo("s.onDataAvailable"));
assertThat(_history.poll(), nullValue());
assertThat(_in.read(), equalTo((int)'C'));
assertThat(_in.isReady(), equalTo(true));
assertThat(_in.read(), equalTo((int)'D'));
assertThat(_history.poll(), equalTo("Content succeeded CD"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(false));
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("s.onReadUnready"));
assertThat(_history.poll(), nullValue());
}
@Test
public void testAsyncEOF() throws Exception
{
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true);
_in.setReadListener(_listener);
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false);
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("s.onReadUnready"));
assertThat(_history.poll(), nullValue());
_in.eof();
assertThat(_in.isReady(), equalTo(true));
assertThat(_in.isFinished(), equalTo(false));
assertThat(_history.poll(), equalTo("s.onDataAvailable"));
assertThat(_history.poll(), nullValue());
assertThat(_in.read(), equalTo(-1));
assertThat(_in.isFinished(), equalTo(true));
assertThat(_history.poll(), nullValue());
}
@Test
public void testAsyncReadEOF() throws Exception
{
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true);
_in.setReadListener(_listener);
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false);
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("s.onReadUnready"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(false));
assertThat(_history.poll(), nullValue());
_in.addContent(new TContent("AB"));
_fillAndParseSimulate.add("_EOF_");
assertThat(_history.poll(), equalTo("s.onDataAvailable"));
assertThat(_history.poll(), nullValue());
_in.run();
assertThat(_history.poll(), equalTo("l.onDataAvailable"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(true));
assertThat(_in.read(), equalTo((int)'A'));
assertThat(_in.isReady(), equalTo(true));
assertThat(_in.read(), equalTo((int)'B'));
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.isReady(), equalTo(true));
assertThat(_history.poll(), equalTo("produceContent 1"));
assertThat(_history.poll(), equalTo("s.onDataAvailable"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isFinished(), equalTo(false));
assertThat(_in.read(), equalTo(-1));
assertThat(_in.isFinished(), equalTo(true));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(true));
assertThat(_history.poll(), nullValue());
}
@Test
public void testAsyncError() throws Exception
{
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true);
_in.setReadListener(_listener);
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false);
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), equalTo("s.onReadUnready"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(false));
assertThat(_history.poll(), nullValue());
_in.failed(new TimeoutException());
assertThat(_history.poll(), equalTo("s.onDataAvailable"));
assertThat(_history.poll(), nullValue());
_in.run();
assertThat(_in.isFinished(), equalTo(true));
assertThat(_history.poll(), equalTo("l.onError:java.util.concurrent.TimeoutException"));
assertThat(_history.poll(), nullValue());
assertThat(_in.isReady(), equalTo(true));
IOException e = assertThrows(IOException.class, () -> _in.read());
assertThat(e.getCause(), instanceOf(TimeoutException.class));
assertThat(_in.isFinished(), equalTo(true));
assertThat(_history.poll(), nullValue());
}
@Test
public void testSetListenerWithNull() throws Exception
{
//test can't be null
assertThrows(NullPointerException.class, () ->
{
_in.setReadListener(null);
});
}
@Test
public void testSetListenerNotAsync() throws Exception
{
//test not async
assertThrows(IllegalStateException.class, () ->
{
_in.setReadListener(_listener);
});
}
@Test
public void testSetListenerAlreadySet() throws Exception
{
//set up a listener
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(true);
_in.setReadListener(_listener);
//throw away any events generated by setting the listener
_history.clear();
((TestHttpChannelState)_in.getHttpChannelState()).setFakeAsyncState(false);
//now test that you can't set another listener
assertThrows(IllegalStateException.class, () ->
{
_in.setReadListener(_listener);
});
}
@Test
public void testRecycle() throws Exception
{
testAsyncRead();
_in.recycle();
testAsyncRead();
_in.recycle();
testReadEOF();
}
@Test
public void testConsumeAll() throws Exception
{
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
assertThat(_in.read(), equalTo((int)'A'));
assertFalse(_in.consumeAll());
assertThat(_in.getContentConsumed(), equalTo(8L));
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), equalTo("Content succeeded CD"));
assertThat(_history.poll(), equalTo("produceContent 2"));
assertThat(_history.poll(), equalTo("Content succeeded EF"));
assertThat(_history.poll(), equalTo("Content succeeded GH"));
assertThat(_history.poll(), equalTo("produceContent 0"));
assertThat(_history.poll(), nullValue());
}
@Test
public void testConsumeAllEOF() throws Exception
{
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
_fillAndParseSimulate.offer("_EOF_");
assertThat(_in.read(), equalTo((int)'A'));
assertTrue(_in.consumeAll());
assertThat(_in.getContentConsumed(), equalTo(8L));
assertThat(_history.poll(), equalTo("Content succeeded AB"));
assertThat(_history.poll(), equalTo("Content succeeded CD"));
assertThat(_history.poll(), equalTo("produceContent 3"));
assertThat(_history.poll(), equalTo("Content succeeded EF"));
assertThat(_history.poll(), equalTo("Content succeeded GH"));
assertThat(_history.poll(), nullValue());
}
}