Merge all HttpInput* impls into a single class, push all differences to HttpChannel* impls (Milestone 5)

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2020-03-09 16:21:43 +01:00
parent d0f04a8eae
commit a4258ec9c0
17 changed files with 1105 additions and 1277 deletions

View File

@ -18,7 +18,9 @@
package org.eclipse.jetty.fcgi.server;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.Executor;
@ -35,7 +37,6 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
@ -48,7 +49,7 @@ public class HttpChannelOverFCGI extends HttpChannel
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class);
private final Queue<HttpInput.Content> _contentQueue = new LinkedList<>();
private boolean _contentFailed;
private Throwable _contentFailure;
private final HttpFields.Mutable fields = HttpFields.build();
private final Dispatcher dispatcher;
private String method;
@ -65,21 +66,24 @@ public class HttpChannelOverFCGI extends HttpChannel
void enqueueContent(HttpInput.Content content)
{
Throwable failure;
synchronized (_contentQueue)
{
if (_contentFailed)
content.failed(null);
else
failure = _contentFailure;
if (failure == null)
_contentQueue.offer(content);
}
if (failure != null)
content.failed(failure);
}
void pushContent()
@Override
public void produceContent()
{
HttpInput.Content content;
synchronized (_contentQueue)
{
if (_contentFailed)
if (_contentFailure != null)
content = null;
else
content = _contentQueue.poll();
@ -88,25 +92,21 @@ public class HttpChannelOverFCGI extends HttpChannel
onContent(content);
}
void failContent(Throwable failure)
@Override
public void failContent(Throwable failure)
{
List<HttpInput.Content> copy;
synchronized (_contentQueue)
{
_contentFailed = true;
while (true)
{
HttpInput.Content content = _contentQueue.poll();
if (content == null)
break;
content.failed(failure);
}
}
}
if (_contentFailure == null)
_contentFailure = failure;
else if (_contentFailure != failure)
_contentFailure.addSuppressed(failure);
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInputOverFCGI(state);
copy = new ArrayList<>(_contentQueue);
_contentQueue.clear();
}
copy.forEach(content -> content.failed(failure));
}
protected void header(HttpField field)

View File

@ -1,42 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.fcgi.server;
import org.eclipse.jetty.server.AbstractLockedHttpInput;
import org.eclipse.jetty.server.HttpChannelState;
public class HttpInputOverFCGI extends AbstractLockedHttpInput
{
public HttpInputOverFCGI(HttpChannelState state)
{
super(state);
}
@Override
protected void produceRawContent()
{
((HttpChannelOverFCGI)_channelState.getHttpChannel()).pushContent();
}
@Override
protected void failRawContent(Throwable failure)
{
((HttpChannelOverFCGI)_channelState.getHttpChannel()).failContent(failure);
}
}

View File

@ -77,6 +77,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private Listener listener;
private long dataLength;
private long dataDemand;
private Throwable failure;
private boolean dataInitial;
private boolean dataProcess;
@ -240,7 +241,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
try (AutoLock l = lock.lock())
{
dataDemand = Long.MIN_VALUE;
dataDemand = 0;
failure = x;
while (true)
{
DataEntry dataEntry = dataQueue.poll();
@ -416,10 +418,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
DataEntry entry = new DataEntry(frame, callback);
try (AutoLock l = lock.lock())
{
if (dataDemand == Long.MIN_VALUE)
if (failure != null)
{
// stream has been failed
callback.failed(null);
callback.failed(failure);
return;
}
dataQueue.offer(entry);
@ -461,7 +463,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
boolean proceed = false;
try (AutoLock l = lock.lock())
{
if (dataDemand == Long.MIN_VALUE)
if (failure != null)
return; // stream has been failed
demand = dataDemand = MathUtils.cappedAdd(dataDemand, n);
if (!dataProcess)

View File

@ -41,11 +41,11 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,58 +62,58 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
private class RequestContent
{
private final ThreadLocal<Boolean> _syncFetchContentTl = new ThreadLocal<>();
private HttpInput.Content _content;
private boolean _endStream;
private boolean _producing;
private final AutoLock _lock = new AutoLock();
public void demand()
void setContent(boolean endStream, HttpInput.Content content)
{
if (!_producing)
try (AutoLock ignored = _lock.lock())
{
_producing = true;
_syncFetchContentTl.set(Boolean.TRUE);
try
{
getStream().demand(1);
}
finally
{
_syncFetchContentTl.remove();
}
if (_content != null)
throw new AssertionError("content cannot be queued; stream=" + getStream());
_endStream = endStream;
_content = content;
_producing = false;
}
}
public void offerContent(HttpInput.Content content)
private HttpInput.Content takeContent(boolean[] endStreamResult)
{
if (_content != null)
throw new AssertionError("content cannot be queued");
_content = content;
_producing = false;
try (AutoLock ignored = _lock.lock())
{
if (_content == null)
return null;
HttpInput.Content contentCopy = _content;
endStreamResult[0] = _endStream;
_content = null;
_endStream = false;
return contentCopy;
}
}
public HttpInput.Content takeContent()
HttpInput.Content takeContentOrDemand(boolean[] endStreamResult)
{
HttpInput.Content contentCopy = _content;
_content = null;
return contentCopy;
}
HttpInput.Content content = takeContent(endStreamResult);
if (content != null)
return content;
public void reachedEndOfStream(boolean endStream)
{
_endStream = endStream;
}
boolean demand;
try (AutoLock ignored = _lock.lock())
{
demand = !_producing;
if (demand)
{
if (_content != null)
throw new AssertionError("_content should be null");
_producing = true;
}
}
if (demand)
getStream().demand(1);
public boolean hasReachedEndOfStream()
{
boolean copy = _endStream;
_endStream = false;
return copy;
}
private boolean isFetchingContent()
{
return !Boolean.TRUE.equals(_syncFetchContentTl.get());
return takeContent(endStreamResult);
}
}
@ -162,12 +162,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
getResponse().getHttpOutput().onFlushed(bytes);
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInputOverHTTP2(state);
}
public Runnable onRequest(HeadersFrame frame)
{
try
@ -316,8 +310,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
ByteBuffer buffer = frame.getData();
int length = buffer.remaining();
_requestContent.reachedEndOfStream(frame.isEndStream());
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Request #{}/{}: {} bytes of content",
@ -329,7 +321,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false;
_requestContent.offerContent(new HttpInput.Content(buffer)
_requestContent.setContent(frame.isEndStream(), new HttpInput.Content(buffer)
{
@Override
public void succeeded()
@ -351,8 +343,8 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
});
if (getState().isAsync())
{
boolean handle = _requestContent.isFetchingContent() && getState().onReadPossible();
return handle || wasDelayed ? this : null;
boolean handle = getState().onReadPossible();
return handle || wasDelayed ? this : null;
}
else
{
@ -361,30 +353,29 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
}
}
void fetchContent()
@Override
public void produceContent()
{
// HttpInputOverHttp2 calls this method via produceRawContent;
// HttpInputOverHttp2 calls this method via produceRawContent();
// this is the equivalent of Http1 parseAndFill().
HttpInput.Content content = _requestContent.takeContent();
boolean[] endStreamResult = new boolean[1];
HttpInput.Content content = _requestContent.takeContentOrDemand(endStreamResult);
if (content != null)
{
onContent(content);
if (endStreamResult[0])
{
onContentComplete();
onRequestComplete();
}
}
else
{
_requestContent.demand();
// If content was produced synchronously, consume it right away.
content = _requestContent.takeContent();
if (content != null)
onContent(content);
}
}
if (_requestContent.hasReachedEndOfStream())
{
onContentComplete();
onRequestComplete();
}
@Override
public void failContent(Throwable failure)
{
getStream().fail(failure);
}
@Override

View File

@ -1,42 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.server.AbstractLockedHttpInput;
import org.eclipse.jetty.server.HttpChannelState;
public class HttpInputOverHTTP2 extends AbstractLockedHttpInput
{
public HttpInputOverHTTP2(HttpChannelState state)
{
super(state);
}
@Override
protected void produceRawContent()
{
((HttpChannelOverHTTP2)_channelState.getHttpChannel()).fetchContent();
}
@Override
protected void failRawContent(Throwable failure)
{
((HttpChannelOverHTTP2)_channelState.getHttpChannel()).getStream().fail(failure);
}
}

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.server.Authentication;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
@ -64,9 +63,13 @@ public class SpnegoAuthenticatorTest
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
public void produceContent()
{
}
@Override
public void failContent(Throwable failure)
{
return null;
}
@Override
@ -105,9 +108,13 @@ public class SpnegoAuthenticatorTest
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
public void produceContent()
{
}
@Override
public void failContent(Throwable failure)
{
return null;
}
@Override

View File

@ -1,767 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import javax.servlet.ReadListener;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.AutoLock;
public abstract class AbstractHttpInput extends HttpInput
{
private static final Logger LOG = Log.getLogger(AbstractHttpInput.class);
private final byte[] _oneByteBuffer = new byte[1];
protected final HttpChannelState _channelState;
protected final ContentProducer _contentProducer;
protected final AutoLock _contentLock = new AutoLock();
protected final Condition _contentLockCondition = _contentLock.newCondition();
private Eof _eof = Eof.NOT_YET;
private Throwable _error;
private ReadListener _readListener;
private long _firstByteTimeStamp = Long.MIN_VALUE;
public AbstractHttpInput(HttpChannelState state)
{
_channelState = state;
_contentProducer = new ContentProducer(this::produceRawContent);
}
/* HttpInput */
@Override
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle");
_contentProducer.recycle();
_eof = Eof.NOT_YET;
_error = null;
_readListener = null;
_firstByteTimeStamp = Long.MIN_VALUE;
}
@Override
public Interceptor getInterceptor()
{
return _contentProducer.getInterceptor();
}
@Override
public void setInterceptor(Interceptor interceptor)
{
_contentProducer.setInterceptor(interceptor);
}
@Override
public void addInterceptor(Interceptor interceptor)
{
_contentProducer.addInterceptor(interceptor);
}
@Override
public void asyncReadProduce()
{
if (LOG.isDebugEnabled())
LOG.debug("asyncReadProduce {}", _contentProducer);
_contentProducer.produceRawContent();
}
@Override
public void addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
if (_firstByteTimeStamp == Long.MIN_VALUE)
{
_firstByteTimeStamp = System.nanoTime();
if (_firstByteTimeStamp == Long.MIN_VALUE)
_firstByteTimeStamp++;
}
_contentProducer.addContent(content);
if (isAsync())
_channelState.onContentAdded();
}
@Override
public boolean hasContent()
{
return _contentProducer.hasRawContent();
}
@Override
public void unblock()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("signalling blocked thread to wake up");
_contentLockCondition.signal();
}
}
@Override
public long getContentLength()
{
return _contentProducer.getRawContentArrived();
}
@Override
public boolean earlyEOF()
{
if (LOG.isDebugEnabled())
LOG.debug("received early EOF");
_eof = Eof.EARLY_EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
_eof = Eof.EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("consume all");
_contentProducer.consumeTransformedContent(() -> failRawContent(new IOException("Unconsumed content")));
if (_eof.isEof())
_eof = Eof.CONSUMED_EOF;
if (isFinished())
return !isError();
_eof = Eof.EARLY_EOF;
return false;
}
@Override
public boolean isError()
{
return _error != null;
}
@Override
public boolean isAsync()
{
return _readListener != null;
}
@Override
public boolean onIdleTimeout(Throwable x)
{
boolean neverDispatched = _channelState.isIdle();
boolean waitingForContent = _contentProducer.available() == 0 && !_eof.isEof();
if ((waitingForContent || neverDispatched) && !isError())
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
}
return false;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
if (_error != null)
_error.addSuppressed(x);
else
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed();
if (LOG.isDebugEnabled())
LOG.debug("isFinished? {}", finished);
return finished;
}
@Override
public boolean isReady()
{
// calling _contentProducer.available() might change the _eof state, so the following test order matters
if (_contentProducer.available() > 0 || _eof.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("isReady? true");
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("isReady? false");
_channelState.onReadUnready();
return false;
}
@Override
public void setReadListener(ReadListener readListener)
{
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
if (LOG.isDebugEnabled())
LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer);
boolean woken;
if (isError())
{
woken = _channelState.onReadReady();
}
else
{
if (_contentProducer.available() > 0)
{
woken = _channelState.onReadReady();
}
else if (_eof.isEof())
{
woken = _channelState.onReadEof();
}
else
{
_channelState.onReadUnready();
woken = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("setReadListener woken=" + woken);
if (woken)
scheduleReadListenerNotification();
}
private void scheduleReadListenerNotification()
{
HttpChannel channel = _channelState.getHttpChannel();
channel.execute(channel);
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
// Calculate minimum request rate for DOS protection
long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate();
if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE)
{
long period = System.nanoTime() - _firstByteTimeStamp;
if (period > 0)
{
long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentProducer.getRawContentArrived() < minimumData)
{
BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,
String.format("Request content data rate < %d B/s", minRequestDataRate));
if (_channelState.isResponseCommitted())
_channelState.getHttpChannel().abort(bad);
throw bad;
}
}
}
while (true)
{
int read = _contentProducer.read(b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s)", read);
if (read > 0)
return read;
if (LOG.isDebugEnabled())
LOG.debug("read error = " + _error);
if (_error != null)
throw new IOException(_error);
if (LOG.isDebugEnabled())
LOG.debug("read EOF = {}", _eof);
if (_eof.isEarly())
throw new EofException("Early EOF");
if (LOG.isDebugEnabled())
LOG.debug("read async = {}", isAsync());
if (!isAsync())
{
if (_eof.isEof())
{
_eof = Eof.CONSUMED_EOF;
if (LOG.isDebugEnabled())
LOG.debug("read on EOF, switching to CONSUMED_EOF and returning");
return -1;
}
if (LOG.isDebugEnabled())
LOG.debug("read blocked");
blockForContent();
if (LOG.isDebugEnabled())
LOG.debug("read unblocked");
}
else
{
if (_eof.isEof())
{
boolean wasInAsyncWait = _channelState.onReadEof();
if (wasInAsyncWait)
scheduleReadListenerNotification();
if (LOG.isDebugEnabled())
LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait);
_eof = Eof.CONSUMED_EOF;
return -1;
}
else
{
//TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead?
_channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested
return 0;
}
}
}
}
@Override
public int available()
{
int available = _contentProducer.available();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
return available;
}
private void blockForContent()
{
try (AutoLock lock = _contentLock.lock())
{
_channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested
if (LOG.isDebugEnabled())
LOG.debug("waiting for signal to wake up");
_contentLockCondition.await();
if (LOG.isDebugEnabled())
LOG.debug("signalled to wake up");
}
catch (Throwable x)
{
_channelState.getHttpChannel().onBlockWaitForContentFailure(x);
}
}
/* Runnable */
/*
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
{
if (!_contentProducer.hasRawContent())
{
if (LOG.isDebugEnabled())
LOG.debug("running has no raw content; error: {}, EOF = {}", _error, _eof);
if (_error != null || _eof.isEarly())
{
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
if (_error != null)
_readListener.onError(_error);
else
_readListener.onError(new EofException("Early EOF"));
}
else if (_eof.isEof())
{
try
{
_readListener.onAllDataRead();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onAllDataRead", x);
_readListener.onError(x);
}
}
// else: !hasContent() && !error && !EOF -> no-op
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("running has raw content");
try
{
_readListener.onDataAvailable();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onDataAvailable", x);
_readListener.onError(x);
}
}
}
protected abstract void produceRawContent();
protected abstract void failRawContent(Throwable failure);
/**
* An {@link Interceptor} that chains two other {@link Interceptor}s together.
* The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s
* {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned
* to the next {@link Interceptor}.
*/
static class ChainedInterceptor implements Interceptor, Destroyable
{
private final Interceptor _prev;
private final Interceptor _next;
public ChainedInterceptor(Interceptor prev, Interceptor next)
{
_prev = prev;
_next = next;
}
public Interceptor getPrev()
{
return _prev;
}
public Interceptor getNext()
{
return _next;
}
@Override
public Content readFrom(Content content)
{
return getNext().readFrom(getPrev().readFrom(content));
}
@Override
public void destroy()
{
if (_prev instanceof Destroyable)
((Destroyable)_prev).destroy();
if (_next instanceof Destroyable)
((Destroyable)_next).destroy();
}
}
enum Eof
{
NOT_YET(false, false, false),
EOF(true, false, false),
CONSUMED_EOF(true, true, false),
EARLY_EOF(true, false, true),
;
private final boolean _eof;
private final boolean _consumed;
private final boolean _early;
Eof(boolean eof, boolean consumed, boolean early)
{
_eof = eof;
_consumed = consumed;
_early = early;
}
public boolean isEof()
{
return _eof;
}
public boolean isConsumed()
{
return _consumed;
}
public boolean isEarly()
{
return _early;
}
}
protected static class ContentProducer
{
private final Runnable _rawContentProducer;
// Note: _rawContent can never be null for as long as _transformedContent is not null.
private Content _rawContent;
private Content _transformedContent;
private long _rawContentArrived;
private Interceptor _interceptor;
private boolean _allConsumed;
public ContentProducer(Runnable rawContentProducer)
{
_rawContentProducer = rawContentProducer;
}
void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this);
if (_transformedContent == _rawContent)
_transformedContent = null;
if (_transformedContent != null && !_transformedContent.isEmpty())
_transformedContent.failed(null);
_transformedContent = null;
if (_rawContent != null && !_rawContent.isEmpty())
_rawContent.failed(null);
_rawContent = null;
_rawContentArrived = 0L;
if (_interceptor instanceof Destroyable)
((Destroyable)_interceptor).destroy();
_interceptor = null;
_allConsumed = false;
}
int available()
{
if (_transformedContent != null)
return _transformedContent.remaining();
if (_rawContent == null)
produceRawContent();
produceTransformedContent();
return _transformedContent == null ? 0 : _transformedContent.remaining();
}
long getRawContentArrived()
{
return _rawContentArrived;
}
boolean hasRawContent()
{
return _rawContent != null;
}
Interceptor getInterceptor()
{
return _interceptor;
}
void setInterceptor(Interceptor interceptor)
{
this._interceptor = interceptor;
}
void addInterceptor(Interceptor interceptor)
{
if (_interceptor == null)
_interceptor = interceptor;
else
_interceptor = new ChainedInterceptor(_interceptor, interceptor);
}
void addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);
if (content == null)
throw new AssertionError("Cannot add null content");
if (_allConsumed)
{
content.failed(null);
return;
}
if (_rawContent != null)
throw new AssertionError("Cannot add new content while current one hasn't been processed");
_rawContent = content;
_rawContentArrived += content.remaining();
}
void consumeTransformedContent(Runnable failRawContent)
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeTransformedContent", this);
// start by depleting the current _transformedContent
if (_transformedContent != null)
{
_transformedContent.skip(_transformedContent.remaining());
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
// don't bother transforming content, directly deplete the raw one
consumeRawContent();
// fail whatever other content the producer may have
failRawContent.run();
_allConsumed = true;
}
void consumeRawContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeRawContent", this);
if (_rawContent != null)
{
_rawContent.skip(_rawContent.remaining());
_rawContent.succeeded();
_rawContent = null;
}
}
int read(byte[] b, int off, int len)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read", this);
while (_transformedContent == null)
{
if (_rawContent == null)
{
produceRawContent();
if (_rawContent == null)
return 0;
}
produceTransformedContent();
}
int read = _transformedContent.get(b, off, len);
if (_transformedContent.isEmpty())
produceTransformedContent(); //TODO: this should be something like cleanupTransformedContent() instead
return read;
}
/**
* Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer
* if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)}
* is called.
*/
void produceRawContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceRawContent", this);
_rawContentProducer.run();
}
/**
* Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent},
* or store null in {@code _transformedContent} if there is no content to work with.
* Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}.
*/
private void produceTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceTransformedContent", this);
if (_interceptor == null)
{
// no interceptor set
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
_transformedContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
else
{
// interceptor set
transformContent();
if (_transformedContent == null)
{
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
}
}
/**
* Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor.
* The produced content is guaranteed to either be null or not empty.
*/
private void transformContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} transformContent", this);
if (_rawContent == null)
return;
_transformedContent = _interceptor.readFrom(_rawContent);
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived +
",r=" + _rawContent + ",t=" + _transformedContent + "]";
}
}
}

View File

@ -1,236 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import javax.servlet.ReadListener;
import org.eclipse.jetty.util.thread.AutoLock;
public abstract class AbstractLockedHttpInput extends AbstractHttpInput
{
public AbstractLockedHttpInput(HttpChannelState state)
{
super(state);
}
/* HttpInput */
@Override
public void recycle()
{
try (AutoLock lock = _contentLock.lock())
{
super.recycle();
}
}
@Override
public Interceptor getInterceptor()
{
try (AutoLock lock = _contentLock.lock())
{
return super.getInterceptor();
}
}
@Override
public void setInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _contentLock.lock())
{
super.setInterceptor(interceptor);
}
}
@Override
public void addInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _contentLock.lock())
{
super.addInterceptor(interceptor);
}
}
@Override
public void asyncReadProduce()
{
try (AutoLock lock = _contentLock.lock())
{
super.asyncReadProduce();
}
}
@Override
public void addContent(Content content)
{
try (AutoLock lock = _contentLock.lock())
{
super.addContent(content);
}
}
@Override
public boolean hasContent()
{
try (AutoLock lock = _contentLock.lock())
{
return super.hasContent();
}
}
@Override
public long getContentLength()
{
try (AutoLock lock = _contentLock.lock())
{
return super.getContentLength();
}
}
@Override
public boolean earlyEOF()
{
try (AutoLock lock = _contentLock.lock())
{
return super.earlyEOF();
}
}
@Override
public boolean eof()
{
try (AutoLock lock = _contentLock.lock())
{
return super.eof();
}
}
@Override
public boolean consumeAll()
{
try (AutoLock lock = _contentLock.lock())
{
return super.consumeAll();
}
}
@Override
public boolean isError()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isError();
}
}
@Override
public boolean isAsync()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isAsync();
}
}
@Override
public boolean onIdleTimeout(Throwable x)
{
try (AutoLock lock = _contentLock.lock())
{
return super.onIdleTimeout(x);
}
}
@Override
public boolean failed(Throwable x)
{
try (AutoLock lock = _contentLock.lock())
{
return super.failed(x);
}
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isFinished();
}
}
@Override
public boolean isReady()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isReady();
}
}
@Override
public void setReadListener(ReadListener readListener)
{
try (AutoLock lock = _contentLock.lock())
{
super.setReadListener(readListener);
}
}
@Override
public int read() throws IOException
{
try (AutoLock lock = _contentLock.lock())
{
return super.read();
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
try (AutoLock lock = _contentLock.lock())
{
return super.read(b, off, len);
}
}
@Override
public int available()
{
try (AutoLock lock = _contentLock.lock())
{
return super.available();
}
}
/* Runnable */
@Override
public void run()
{
try (AutoLock lock = _contentLock.lock())
{
super.run();
}
}
}

View File

@ -119,7 +119,14 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
return _state.isSendError();
}
protected abstract HttpInput newHttpInput(HttpChannelState state);
private HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInput(state);
}
public abstract void produceContent();
public abstract void failContent(Throwable failure);
protected HttpOutput newHttpOutput()
{

View File

@ -75,6 +75,18 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
_httpConnection.getGenerator().setPersistent(false);
}
@Override
public void produceContent()
{
((HttpConnection)getEndPoint().getConnection()).parseAndFillForContent();
}
@Override
public void failContent(Throwable failure)
{
((HttpConnection)getEndPoint().getConnection()).failContent(failure);
}
@Override
public void badMessage(BadMessageException failure)
{
@ -458,12 +470,6 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
super.handleException(x);
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInputOverHTTP(state);
}
/**
* <p>Attempts to perform an HTTP/1.1 upgrade.</p>
* <p>The upgrade looks up a {@link ConnectionFactory.Upgrading} from the connector

View File

@ -1370,6 +1370,11 @@ public class HttpChannelState
}
break;
case IDLE:
case READY:
case REGISTER:
break;
default:
throw new IllegalStateException(toStringLocked());
}

View File

@ -319,7 +319,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
/**
* Parse and fill data, looking for content
*/
protected void parseAndFillForContent()
void parseAndFillForContent()
{
// parseRequestBuffer() must always be called after fillRequestBuffer() otherwise this method doesn't trigger EOF/earlyEOF
// which breaks AsyncRequestReadTest.testPartialReadThenShutdown()
@ -333,6 +333,20 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
void failContent(Throwable failure)
{
int filled = Integer.MAX_VALUE;
while (_parser.inContentState())
{
// The parser is going generate and forward contents to the HttpInput
// so it's up to it to fail them individually.
parseRequestBuffer();
if (filled <= 0 || _input.hasContent())
break;
filled = fillRequestBuffer();
}
}
private int fillRequestBuffer()
{
if (_contentBufferReferences.get() > 0)

View File

@ -20,61 +20,152 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p> This would be an interface if ServletInputStream was an interface too.</p>
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
public abstract class HttpInput extends ServletInputStream implements Runnable
public class HttpInput extends ServletInputStream implements Runnable
{
private static final Logger LOG = Log.getLogger(HttpInput.class);
public abstract void recycle();
private final byte[] _oneByteBuffer = new byte[1];
private final HttpChannelState _channelState;
private final ContentProducer _contentProducer = new ContentProducer();
// This semaphore is only used in blocking mode, and a standard lock with a condition variable
// cannot work here because there is a race condition between the _contentProducer.read() call
// and the blockForContent() call: content can be produced any time between these two calls so
// the call to unblock() done by the content-producing thread to wake up the user thread executing read()
// must 'remember' the unblock() call, such as if it happens before the thread executing read() reaches the
// blockForContent() method, it will not get stuck in it forever waiting for an unblock() call it missed.
private final Semaphore _semaphore = new Semaphore(0);
private Eof _eof = Eof.NOT_YET;
private Throwable _error;
private ReadListener _readListener;
private long _firstByteTimeStamp = Long.MIN_VALUE;
public HttpInput(HttpChannelState state)
{
_channelState = state;
}
/* HttpInput */
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle");
_contentProducer.recycle();
_eof = Eof.NOT_YET;
_error = null;
_readListener = null;
_firstByteTimeStamp = Long.MIN_VALUE;
}
/**
* @return The current Interceptor, or null if none set
*/
public abstract Interceptor getInterceptor();
public Interceptor getInterceptor()
{
return _contentProducer.getInterceptor();
}
/**
* Set the interceptor.
*
* @param interceptor The interceptor to use.
*/
public abstract void setInterceptor(Interceptor interceptor);
public void setInterceptor(Interceptor interceptor)
{
_contentProducer.setInterceptor(interceptor);
}
/**
* Set the {@link org.eclipse.jetty.server.HttpInput.Interceptor}, chaining it to the existing one if
* an {@link org.eclipse.jetty.server.HttpInput.Interceptor} is already set.
* Set the {@link Interceptor}, chaining it to the existing one if
* an {@link Interceptor} is already set.
*
* @param interceptor the next {@link org.eclipse.jetty.server.HttpInput.Interceptor} in a chain
* @param interceptor the next {@link Interceptor} in a chain
*/
public abstract void addInterceptor(Interceptor interceptor);
public void addInterceptor(Interceptor interceptor)
{
Interceptor currentInterceptor = _contentProducer.getInterceptor();
if (currentInterceptor == null)
_contentProducer.setInterceptor(interceptor);
else
_contentProducer.setInterceptor(new ChainedInterceptor(currentInterceptor, interceptor));
}
/**
* Called by channel when asynchronous IO needs to produce more content
*
* @throws IOException if unable to produce content
*/
public abstract void asyncReadProduce() throws IOException;
public void asyncReadProduce()
{
if (LOG.isDebugEnabled())
LOG.debug("asyncReadProduce {}", _contentProducer);
produceContent();
}
/**
* Adds some content to this input stream.
*
* @param content the content to add
*/
public abstract void addContent(Content content);
public void addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
if (_firstByteTimeStamp == Long.MIN_VALUE)
{
_firstByteTimeStamp = System.nanoTime();
if (_firstByteTimeStamp == Long.MIN_VALUE)
_firstByteTimeStamp++;
}
_contentProducer.addContent(content);
if (isAsync() && _contentProducer.available(this::produceContent) > 0)
_channelState.onContentAdded();
}
public abstract boolean hasContent();
public boolean hasContent()
{
return _contentProducer.hasRawContent();
}
public abstract void unblock();
// There are 3 sources which can call this method in parallel:
// 1) HTTP2 read() that has a demand served on the app thread;
// 2) HTTP2 read() that has a demand served by a server thread;
// 3) onIdleTimeout called by a server thread;
// which means the semaphore can have up to 2 permits.
public void unblock()
{
if (LOG.isDebugEnabled())
LOG.debug("signalling blocked thread to wake up");
if (!isError() && !_eof.isEof() && _semaphore.availablePermits() > 1)
throw new AssertionError("Only one thread should call unblock and only if we are blocked");
_semaphore.release();
}
public abstract long getContentLength();
public long getContentLength()
{
return _contentProducer.getRawContentArrived();
}
public long getContentReceived()
{
@ -88,30 +179,623 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*
* @return true if content channel woken for read
*/
public abstract boolean earlyEOF();
public boolean earlyEOF()
{
if (LOG.isDebugEnabled())
LOG.debug("received early EOF");
_eof = Eof.EARLY_EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
/**
* This method should be called to signal that all the expected content arrived.
*
* @return true if content channel woken for read
*/
public abstract boolean eof();
public boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
_eof = Eof.EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
public abstract boolean consumeAll();
public boolean consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("consume all");
_contentProducer.consumeTransformedContent(this::failContent, new IOException("Unconsumed content"));
if (_eof.isEof())
_eof = Eof.CONSUMED_EOF;
public abstract boolean isError();
if (isFinished())
return !isError();
public abstract boolean isAsync();
_eof = Eof.EARLY_EOF;
return false;
}
public abstract boolean onIdleTimeout(Throwable x);
public boolean isError()
{
return _error != null;
}
public abstract boolean failed(Throwable x);
public boolean isAsync()
{
return _readListener != null;
}
public boolean onIdleTimeout(Throwable x)
{
boolean neverDispatched = _channelState.isIdle();
boolean waitingForContent = _contentProducer.available(this::produceContent) == 0 && !_eof.isEof();
if ((waitingForContent || neverDispatched) && !isError())
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
}
return false;
}
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
if (_error != null && _error != x)
_error.addSuppressed(x);
else
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
/* ServletInputStream */
@Override
public abstract int read(byte[] b, int off, int len) throws IOException;
public boolean isFinished()
{
boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed();
if (LOG.isDebugEnabled())
LOG.debug("isFinished? {}", finished);
return finished;
}
@Override
public abstract int available() throws IOException;
public boolean isReady()
{
// calling _contentProducer.available() might change the _eof state, so the following test order matters
if (_contentProducer.available(this::produceContent) > 0 || _eof.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("isReady? true");
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("isReady? false");
_channelState.onReadUnready();
return false;
}
@Override
public void setReadListener(ReadListener readListener)
{
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
//illegal if async not started
if (!_channelState.isAsyncStarted())
throw new IllegalStateException("Async not started");
if (LOG.isDebugEnabled())
LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer);
boolean woken;
if (isError())
{
woken = _channelState.onReadReady();
}
else
{
if (_contentProducer.available(this::produceContent) > 0)
{
woken = _channelState.onReadReady();
}
else if (_eof.isEof())
{
woken = _channelState.onReadEof();
}
else
{
_channelState.onReadUnready();
woken = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("setReadListener woken=" + woken);
if (woken)
scheduleReadListenerNotification();
}
private void scheduleReadListenerNotification()
{
HttpChannel channel = _channelState.getHttpChannel();
channel.execute(channel);
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
// Calculate minimum request rate for DOS protection
long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate();
if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE)
{
long period = System.nanoTime() - _firstByteTimeStamp;
if (period > 0)
{
long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentProducer.getRawContentArrived() < minimumData)
{
BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,
String.format("Request content data rate < %d B/s", minRequestDataRate));
if (_channelState.isResponseCommitted())
_channelState.getHttpChannel().abort(bad);
throw bad;
}
}
}
while (true)
{
// The semaphore's permits must be drained before we call read() because:
// 1) _contentProducer.read() may call unblock() which enqueues a permit even if the content was produced
// by the exact thread that called HttpInput.read(), hence leaving around an unconsumed permit that would
// be consumed the next time HttpInput.read() is called, mistakenly believing that content was produced.
// 2) HTTP2 demand served asynchronously does call unblock which does enqueue a permit in the semaphore;
// this permit would then be mistakenly consumed by the next call to blockForContent() once all the produced
// content got consumed.
if (!isAsync())
_semaphore.drainPermits();
int read = _contentProducer.read(this::produceContent, b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s)", read);
if (read > 0)
return read;
if (LOG.isDebugEnabled())
LOG.debug("read error = " + _error);
if (_error != null)
throw new IOException(_error);
if (LOG.isDebugEnabled())
LOG.debug("read EOF = {}", _eof);
if (_eof.isEarly())
throw new EofException("Early EOF");
if (LOG.isDebugEnabled())
LOG.debug("read async = {}", isAsync());
if (!isAsync())
{
if (_eof.isEof())
{
_eof = Eof.CONSUMED_EOF;
if (LOG.isDebugEnabled())
LOG.debug("read on EOF, switching to CONSUMED_EOF and returning");
return -1;
}
if (LOG.isDebugEnabled())
LOG.debug("read blocked");
blockForContent();
if (LOG.isDebugEnabled())
LOG.debug("read unblocked");
}
else
{
if (_eof.isEof())
{
_eof = Eof.CONSUMED_EOF;
boolean wasInAsyncWait = _channelState.onReadEof();
if (LOG.isDebugEnabled())
LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait);
if (wasInAsyncWait)
scheduleReadListenerNotification();
return -1;
}
else
{
//TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead?
_channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested
return 0;
}
}
}
}
@Override
public int available()
{
int available = _contentProducer.available(this::produceContent);
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
return available;
}
private void blockForContent()
{
try
{
_channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested
if (LOG.isDebugEnabled())
LOG.debug("waiting for signal to wake up");
_semaphore.acquire();
if (LOG.isDebugEnabled())
LOG.debug("signalled to wake up");
}
catch (Throwable x)
{
_channelState.getHttpChannel().onBlockWaitForContentFailure(x);
}
}
/* Runnable */
/*
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
{
if (!_contentProducer.hasRawContent())
{
if (LOG.isDebugEnabled())
LOG.debug("running has no raw content; error: {}, EOF = {}", _error, _eof);
if (_error != null || _eof.isEarly())
{
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
if (_error != null)
_readListener.onError(_error);
else
_readListener.onError(new EofException("Early EOF"));
}
else if (_eof.isEof())
{
try
{
_readListener.onAllDataRead();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onAllDataRead", x);
_readListener.onError(x);
}
}
// else: !hasContent() && !error && !EOF -> no-op
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("running has raw content");
try
{
_readListener.onDataAvailable();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onDataAvailable", x);
_readListener.onError(x);
}
}
}
private void produceContent()
{
if (LOG.isDebugEnabled())
LOG.debug("produceContent {}", _contentProducer);
_channelState.getHttpChannel().produceContent();
}
private void failContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failContent {} - " + failure, _contentProducer);
_channelState.getHttpChannel().failContent(failure);
}
private enum Eof
{
NOT_YET(false, false, false),
EOF(true, false, false),
CONSUMED_EOF(true, true, false),
EARLY_EOF(true, false, true),
;
private final boolean _eof;
private final boolean _consumed;
private final boolean _early;
Eof(boolean eof, boolean consumed, boolean early)
{
_eof = eof;
_consumed = consumed;
_early = early;
}
boolean isEof()
{
return _eof;
}
boolean isConsumed()
{
return _consumed;
}
boolean isEarly()
{
return _early;
}
}
// All methods of this class have to be synchronized because a HTTP2 reset can call consumeTransformedContent()
// while nextNonEmptyContent() is executing, hence all accesses to _rawContent and _transformedContent must be
// mutually excluded.
// TODO: maybe the locking could be more fine grained, by only protecting the if (null|!null) blocks?
private static class ContentProducer
{
// Note: _rawContent can never be null for as long as _transformedContent is not null.
private Content _rawContent;
private Content _transformedContent;
private long _rawContentArrived;
private Interceptor _interceptor;
private Throwable _consumeFailure;
void recycle()
{
synchronized (this)
{
if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this);
if (_transformedContent == _rawContent)
_transformedContent = null;
if (_transformedContent != null)
_transformedContent.failed(null);
_transformedContent = null;
if (_rawContent != null)
_rawContent.failed(null);
_rawContent = null;
_rawContentArrived = 0L;
if (_interceptor instanceof Destroyable)
((Destroyable)_interceptor).destroy();
_interceptor = null;
_consumeFailure = null;
}
}
long getRawContentArrived()
{
synchronized (this)
{
return _rawContentArrived;
}
}
boolean hasRawContent()
{
synchronized (this)
{
return _rawContent != null;
}
}
Interceptor getInterceptor()
{
synchronized (this)
{
return _interceptor;
}
}
void setInterceptor(Interceptor interceptor)
{
synchronized (this)
{
this._interceptor = interceptor;
}
}
void addContent(Content content)
{
synchronized (this)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);
if (content == null)
throw new AssertionError("Cannot add null content");
if (_consumeFailure != null)
{
content.failed(_consumeFailure);
return;
}
if (_rawContent != null)
throw new AssertionError("Cannot add new content while current one hasn't been processed");
_rawContent = content;
_rawContentArrived += content.remaining();
}
}
void consumeTransformedContent(Consumer<Throwable> failRawContent, Throwable failure)
{
synchronized (this)
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeTransformedContent", this);
// start by depleting the current _transformedContent
if (_transformedContent != null)
{
_transformedContent.skip(_transformedContent.remaining());
if (_transformedContent != _rawContent)
_transformedContent.failed(failure);
_transformedContent = null;
}
// don't bother transforming content, directly deplete the raw one
if (_rawContent != null)
{
_rawContent.skip(_rawContent.remaining());
_rawContent.failed(failure);
_rawContent = null;
}
// fail whatever other content the producer may have
_consumeFailure = failure;
failRawContent.accept(failure);
}
}
int available(Runnable rawContentProducer)
{
synchronized (this)
{
Content content = nextNonEmptyContent(rawContentProducer);
return content == null ? 0 : content.remaining();
}
}
int read(Runnable rawContentProducer, byte[] b, int off, int len)
{
synchronized (this)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read", this);
Content content = nextNonEmptyContent(rawContentProducer);
return content == null ? 0 : content.get(b, off, len);
}
}
private Content nextNonEmptyContent(Runnable rawContentProducer)
{
if (_rawContent == null)
{
rawContentProducer.run();
if (_rawContent == null)
return null;
}
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
while (_transformedContent == null)
{
if (_interceptor != null)
_transformedContent = _interceptor.readFrom(_rawContent);
else
_transformedContent = _rawContent;
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
if (_transformedContent == null)
{
if (_rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
rawContentProducer.run();
if (_rawContent == null)
return null;
}
}
}
return _transformedContent;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived +
",r=" + _rawContent + ",t=" + _transformedContent + "]";
}
}
/**
* An {@link Interceptor} that chains two other {@link Interceptor}s together.
* The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s
* {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned
* to the next {@link Interceptor}.
*/
private static class ChainedInterceptor implements Interceptor, Destroyable
{
private final Interceptor _prev;
private final Interceptor _next;
ChainedInterceptor(Interceptor prev, Interceptor next)
{
_prev = prev;
_next = next;
}
Interceptor getPrev()
{
return _prev;
}
Interceptor getNext()
{
return _next;
}
@Override
public Content readFrom(Content content)
{
Content c = getPrev().readFrom(content);
if (c == null)
return null;
return getNext().readFrom(c);
}
@Override
public void destroy()
{
if (_prev instanceof Destroyable)
((Destroyable)_prev).destroy();
if (_next instanceof Destroyable)
((Destroyable)_next).destroy();
}
}
public interface Interceptor
{

View File

@ -1,53 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
// tests used: RequestTest, PartialRFC2616Test, AsyncRequestReadTest, AsyncIOServletTest, GzipHandlerTest
public class HttpInputOverHTTP extends AbstractHttpInput
{
public HttpInputOverHTTP(HttpChannelState state)
{
super(state);
}
@Override
public void addContent(Content content)
{
super.addContent(content);
}
@Override
protected void produceRawContent()
{
((HttpConnection)_channelState.getHttpChannel().getEndPoint().getConnection()).parseAndFillForContent();
}
@Override
protected void failRawContent(Throwable failure)
{
while (true)
{
if (!_contentProducer.hasRawContent())
_contentProducer.produceRawContent();
if (!_contentProducer.hasRawContent())
break;
_contentProducer.consumeRawContent();
}
}
}

View File

@ -49,9 +49,13 @@ public class HttpWriterTest
HttpChannel channel = new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null)
{
@Override
protected HttpInput newHttpInput(HttpChannelState state)
public void produceContent()
{
}
@Override
public void failContent(Throwable failure)
{
return null;
}
@Override

View File

@ -178,9 +178,13 @@ public class ResponseTest
})
{
@Override
protected HttpInput newHttpInput(HttpChannelState state)
public void produceContent()
{
}
@Override
public void failContent(Throwable failure)
{
return new HttpInputOverHTTP(state);
}
};
}

View File

@ -36,6 +36,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
import javax.servlet.ReadListener;
@ -71,6 +72,7 @@ import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FuturePromise;
import org.hamcrest.Matchers;
@ -1204,8 +1206,8 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
case 0:
// null transform
if (content.isEmpty())
state++;
content.skip(content.remaining());
state++;
return null;
case 1:
@ -1254,7 +1256,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
default:
return null;
return content;
}
}
});
@ -1300,7 +1302,6 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"S0" +
"S1" +
"S2" +
"S3S3" +
@ -1345,6 +1346,249 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testAsyncInterceptedTwice(Transport transport) throws Exception
{
init(transport);
scenario.start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
System.err.println("Service " + request);
final HttpInput httpInput = ((Request)request).getHttpInput();
httpInput.addInterceptor(new GzipHttpInputInterceptor(((Request)request).getHttpChannel().getByteBufferPool(), 1024));
httpInput.addInterceptor(content ->
{
ByteBuffer byteBuffer = content.getByteBuffer();
byte[] bytes = new byte[2];
bytes[1] = byteBuffer.get();
bytes[0] = byteBuffer.get();
return new Content(wrap(bytes));
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady())
{
int b = input.read();
if (b > 0)
{
// System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b));
out.write(b);
}
else if (b < 0)
return;
}
}
@Override
public void onAllDataRead() throws IOException
{
response.getOutputStream().write(out.toByteArray());
asyncContext.complete();
}
@Override
public void onError(Throwable x)
{
}
});
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"0S" +
"1S" +
"2S" +
"3S" +
"4S" +
"5S" +
"6S";
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.path(scenario.servletPath)
.content(contentProvider)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
{
Response response = result.getResponse();
assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
assertThat(getContentAsString(), Matchers.equalTo(expected));
clientLatch.countDown();
}
}
});
contentProvider.offer(gzipToBuffer("S0"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S1"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S2"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S3"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S4"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S5"));
contentProvider.flush();
contentProvider.offer(gzipToBuffer("S6"));
contentProvider.close();
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testAsyncInterceptedTwiceWithNulls(Transport transport) throws Exception
{
init(transport);
scenario.start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
System.err.println("Service " + request);
final HttpInput httpInput = ((Request)request).getHttpInput();
httpInput.addInterceptor(content ->
{
if (content.isEmpty())
return content;
// skip contents with odd numbers
ByteBuffer duplicate = content.getByteBuffer().duplicate();
duplicate.get();
byte integer = duplicate.get();
int idx = Character.getNumericValue(integer);
Content contentCopy = new Content(content.getByteBuffer().duplicate());
content.skip(content.remaining());
if (idx % 2 == 0)
return contentCopy;
return null;
});
httpInput.addInterceptor(content ->
{
if (content.isEmpty())
return content;
// reverse the bytes
ByteBuffer byteBuffer = content.getByteBuffer();
byte[] bytes = new byte[2];
bytes[1] = byteBuffer.get();
bytes[0] = byteBuffer.get();
return new Content(wrap(bytes));
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady())
{
int b = input.read();
if (b > 0)
{
// System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b));
out.write(b);
}
else if (b < 0)
return;
}
}
@Override
public void onAllDataRead() throws IOException
{
response.getOutputStream().write(out.toByteArray());
asyncContext.complete();
}
@Override
public void onError(Throwable x)
{
}
});
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"0S" +
"2S" +
"4S" +
"6S";
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
.path(scenario.servletPath)
.content(contentProvider)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
{
Response response = result.getResponse();
assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
assertThat(getContentAsString(), Matchers.equalTo(expected));
clientLatch.countDown();
}
}
});
contentProvider.offer(BufferUtil.toBuffer("S0"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S1"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S2"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S3"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S4"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S5"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S6"));
contentProvider.close();
assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
private ByteBuffer gzipToBuffer(String s) throws IOException
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzos = new GZIPOutputStream(baos);
gzos.write(s.getBytes(StandardCharsets.ISO_8859_1));
gzos.close();
return BufferUtil.toBuffer(baos.toByteArray());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testWriteListenerFromOtherThread(Transport transport) throws Exception