http2 impl (Milestone 2)

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2020-02-11 17:02:55 +01:00
parent 5a24c8dd17
commit 946f334810
16 changed files with 1230 additions and 898 deletions

View File

@ -235,6 +235,22 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}
@Override
public void fail(Throwable x)
{
try (AutoLock l = lock.lock())
{
dataDemand = Long.MIN_VALUE;
while (true)
{
DataEntry dataEntry = dataQueue.poll();
if (dataEntry == null)
break;
dataEntry.callback.failed(x);
}
}
}
public boolean isLocallyClosed()
{
return closeState.get() == CloseState.LOCALLY_CLOSED;
@ -400,6 +416,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
DataEntry entry = new DataEntry(frame, callback);
try (AutoLock l = lock.lock())
{
if (dataDemand == Long.MIN_VALUE)
{
// stream has been failed
callback.failed(null);
return;
}
dataQueue.offer(entry);
initial = dataInitial;
if (initial)
@ -439,6 +461,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
boolean proceed = false;
try (AutoLock l = lock.lock())
{
if (dataDemand == Long.MIN_VALUE)
return; // stream has been failed
demand = dataDemand = MathUtils.cappedAdd(dataDemand, n);
if (!dataProcess)
dataProcess = proceed = !dataQueue.isEmpty();

View File

@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
public abstract class HTTP2StreamEndPoint implements EndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2StreamEndPoint.class);
private static final Throwable EOF = new Throwable();
private final AutoLock lock = new AutoLock();
private final Deque<Entry> dataQueue = new ArrayDeque<>();
@ -530,7 +531,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
{
if (buffer.hasRemaining())
offer(buffer, Callback.from(Callback.NOOP::succeeded, callback::failed), null);
offer(BufferUtil.EMPTY_BUFFER, callback, Entry.EOF);
offer(BufferUtil.EMPTY_BUFFER, callback, EOF);
}
else
{
@ -581,10 +582,8 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
writeState);
}
private static class Entry
private class Entry
{
private static final Throwable EOF = new Throwable();
private final ByteBuffer buffer;
private final Callback callback;
private final Throwable failure;
@ -611,6 +610,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
private void succeed()
{
callback.succeeded();
stream.demand(1);
}
private void fail(Throwable failure)

View File

@ -126,6 +126,8 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isResetOrFailed();
void fail(Throwable x);
/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/

View File

@ -242,7 +242,10 @@ public interface Stream
* @param callback the callback to complete when the bytes of the DATA frame have been consumed
* @see #onDataDemanded(Stream, DataFrame, Callback)
*/
public void onData(Stream stream, DataFrame frame, Callback callback);
public default void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
}
/**
* <p>Callback method invoked when a DATA frame has been demanded.</p>

View File

@ -157,7 +157,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
getConnection().onData((IStream)stream, frame, callback);
}

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.handler.ContextHandler;
@ -103,6 +104,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
getResponse().getHttpOutput().onFlushed(bytes);
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInputOverHTTP2(state);
}
public Runnable onRequest(HeadersFrame frame)
{
try
@ -131,9 +138,17 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
_delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !_expect100Continue && !connect;
// Delay the demand of DATA frames for CONNECT with :protocol.
if (!connect || request.getProtocol() == null)
// Delay the demand of DATA frames for CONNECT with :protocol
// or for normal requests expecting 100 continue.
if (!connect)
{
if (!_expect100Continue)
getStream().demand(1);
}
else if (request.getProtocol() == null)
{
getStream().demand(1);
}
if (LOG.isDebugEnabled())
{

View File

@ -0,0 +1,70 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.server.AbstractLockedHttpInput;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.util.thread.AutoLock;
public class HttpInputOverHTTP2 extends AbstractLockedHttpInput
{
private boolean _producing;
public HttpInputOverHTTP2(HttpChannelState state)
{
super(state);
}
@Override
public void recycle()
{
try (AutoLock lock = _contentLock.lock())
{
super.recycle();
_producing = false;
}
}
@Override
public boolean addContent(Content content)
{
try (AutoLock lock = _contentLock.lock())
{
boolean b = super.addContent(content);
_producing = false;
return b;
}
}
@Override
protected void produceRawContent()
{
if (!_producing)
{
_producing = true;
((HttpChannelOverHTTP2)_channelState.getHttpChannel()).getStream().demand(1);
}
}
@Override
protected void failRawContent(Throwable failure)
{
((HttpChannelOverHTTP2)_channelState.getHttpChannel()).getStream().fail(failure);
}
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.server.Authentication;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
@ -62,6 +63,12 @@ public class SpnegoAuthenticatorTest
return null;
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return null;
}
@Override
protected HttpOutput newHttpOutput()
{
@ -97,6 +104,12 @@ public class SpnegoAuthenticatorTest
return null;
}
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return null;
}
@Override
protected HttpOutput newHttpOutput()
{

View File

@ -0,0 +1,769 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import javax.servlet.ReadListener;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.AutoLock;
public abstract class AbstractHttpInput extends HttpInput
{
private static final Logger LOG = Log.getLogger(AbstractHttpInput.class);
private final byte[] _oneByteBuffer = new byte[1];
protected final HttpChannelState _channelState;
protected final ContentProducer _contentProducer;
protected final AutoLock _contentLock = new AutoLock();
protected final Condition _contentLockCondition = _contentLock.newCondition();
private Eof _eof = Eof.NOT_YET;
private Throwable _error;
private ReadListener _readListener;
private long _firstByteTimeStamp = Long.MIN_VALUE;
public AbstractHttpInput(HttpChannelState state)
{
_channelState = state;
_contentProducer = new ContentProducer(this::produceRawContent);
}
/* HttpInput */
@Override
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle");
_contentProducer.recycle();
_eof = Eof.NOT_YET;
_error = null;
_readListener = null;
_firstByteTimeStamp = Long.MIN_VALUE;
}
@Override
public Interceptor getInterceptor()
{
return _contentProducer.getInterceptor();
}
@Override
public void setInterceptor(Interceptor interceptor)
{
_contentProducer.setInterceptor(interceptor);
}
@Override
public void addInterceptor(Interceptor interceptor)
{
_contentProducer.addInterceptor(interceptor);
}
@Override
public void asyncReadProduce()
{
if (LOG.isDebugEnabled())
LOG.debug("asyncReadProduce {}", _contentProducer);
_contentProducer.produceRawContent();
}
@Override
public boolean addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
if (_firstByteTimeStamp == Long.MIN_VALUE)
{
_firstByteTimeStamp = System.nanoTime();
if (_firstByteTimeStamp == Long.MIN_VALUE)
_firstByteTimeStamp++;
}
_contentProducer.addContent(content);
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean hasContent()
{
return _contentProducer.hasRawContent();
}
@Override
public void unblock()
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("signalling blocked thread to wake up");
_contentLockCondition.signal();
}
}
@Override
public long getContentLength()
{
return _contentProducer.getRawContentArrived();
}
@Override
public boolean earlyEOF()
{
if (LOG.isDebugEnabled())
LOG.debug("received early EOF");
_eof = Eof.EARLY_EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
_eof = Eof.EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("consume all");
_contentProducer.consumeTransformedContent(() -> failRawContent(new IOException("Unconsumed content")));
if (_eof.isEof())
_eof = Eof.CONSUMED_EOF;
if (isFinished())
return !isError();
_eof = Eof.EARLY_EOF;
return false;
}
@Override
public boolean isError()
{
return _error != null;
}
@Override
public boolean isAsync()
{
return _readListener != null;
}
@Override
public boolean onIdleTimeout(Throwable x)
{
boolean neverDispatched = _channelState.isIdle();
boolean waitingForContent = _contentProducer.available() == 0 && !_eof.isEof();
if ((waitingForContent || neverDispatched) && !isError())
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
}
return false;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
if (_error != null)
_error.addSuppressed(x);
else
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed();
if (LOG.isDebugEnabled())
LOG.debug("isFinished? {}", finished);
return finished;
}
@Override
public boolean isReady()
{
// calling _contentProducer.available() might change the _eof state, so the following test order matters
if (_contentProducer.available() > 0 || _eof.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("isReady? true");
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("isReady? false");
_channelState.onReadUnready();
return false;
}
@Override
public void setReadListener(ReadListener readListener)
{
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
if (LOG.isDebugEnabled())
LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer);
boolean woken;
if (isError())
{
woken = _channelState.onReadReady();
}
else
{
if (_contentProducer.available() > 0)
{
woken = _channelState.onReadReady();
}
else if (_eof.isEof())
{
woken = _channelState.onReadEof();
}
else
{
_channelState.onReadUnready();
woken = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("setReadListener woken=" + woken);
if (woken)
scheduleReadListenerNotification();
}
private void scheduleReadListenerNotification()
{
HttpChannel channel = _channelState.getHttpChannel();
channel.execute(channel);
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
// Calculate minimum request rate for DOS protection
long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate();
if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE)
{
long period = System.nanoTime() - _firstByteTimeStamp;
if (period > 0)
{
long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentProducer.getRawContentArrived() < minimumData)
{
BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,
String.format("Request content data rate < %d B/s", minRequestDataRate));
if (_channelState.isResponseCommitted())
_channelState.getHttpChannel().abort(bad);
throw bad;
}
}
}
while (true)
{
int read = _contentProducer.read(b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s)", read);
if (read > 0)
return read;
if (LOG.isDebugEnabled())
LOG.debug("read error = " + _error);
if (_error != null)
throw new IOException(_error);
if (LOG.isDebugEnabled())
LOG.debug("read EOF = {}", _eof);
if (_eof.isEarly())
throw new EofException("Early EOF");
if (LOG.isDebugEnabled())
LOG.debug("read async = {}", isAsync());
if (!isAsync())
{
if (_eof.isEof())
{
_eof = Eof.CONSUMED_EOF;
if (LOG.isDebugEnabled())
LOG.debug("read on EOF, switching to CONSUMED_EOF and returning");
return -1;
}
if (LOG.isDebugEnabled())
LOG.debug("read blocked");
blockForContent();
if (LOG.isDebugEnabled())
LOG.debug("read unblocked");
}
else
{
if (_eof.isEof())
{
boolean wasInAsyncWait = _channelState.onReadEof();
if (wasInAsyncWait)
scheduleReadListenerNotification();
if (LOG.isDebugEnabled())
LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait);
_eof = Eof.CONSUMED_EOF;
return -1;
}
else
{
//TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead?
_channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested
return 0;
}
}
}
}
@Override
public int available()
{
int available = _contentProducer.available();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
return available;
}
private void blockForContent()
{
try (AutoLock lock = _contentLock.lock())
{
_channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested
if (LOG.isDebugEnabled())
LOG.debug("waiting for signal to wake up");
_contentLockCondition.await();
if (LOG.isDebugEnabled())
LOG.debug("signalled to wake up");
}
catch (Throwable x)
{
_channelState.getHttpChannel().onBlockWaitForContentFailure(x);
}
}
/* Runnable */
/*
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
{
if (!_contentProducer.hasRawContent())
{
if (LOG.isDebugEnabled())
LOG.debug("running has no raw content; error: {}, EOF = {}", _error, _eof);
if (_error != null || _eof.isEarly())
{
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
if (_error != null)
_readListener.onError(_error);
else
_readListener.onError(new EofException("Early EOF"));
}
else if (_eof.isEof())
{
try
{
_readListener.onAllDataRead();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onAllDataRead", x);
_readListener.onError(x);
}
}
// else: !hasContent() && !error && !EOF -> no-op
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("running has raw content");
try
{
_readListener.onDataAvailable();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onDataAvailable", x);
_readListener.onError(x);
}
}
}
protected abstract void produceRawContent();
protected abstract void failRawContent(Throwable failure);
/**
* An {@link Interceptor} that chains two other {@link Interceptor}s together.
* The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s
* {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned
* to the next {@link Interceptor}.
*/
static class ChainedInterceptor implements Interceptor, Destroyable
{
private final Interceptor _prev;
private final Interceptor _next;
public ChainedInterceptor(Interceptor prev, Interceptor next)
{
_prev = prev;
_next = next;
}
public Interceptor getPrev()
{
return _prev;
}
public Interceptor getNext()
{
return _next;
}
@Override
public Content readFrom(Content content)
{
return getNext().readFrom(getPrev().readFrom(content));
}
@Override
public void destroy()
{
if (_prev instanceof Destroyable)
((Destroyable)_prev).destroy();
if (_next instanceof Destroyable)
((Destroyable)_next).destroy();
}
}
enum Eof
{
NOT_YET(false, false, false),
EOF(true, false, false),
CONSUMED_EOF(true, true, false),
EARLY_EOF(true, false, true),
;
private final boolean _eof;
private final boolean _consumed;
private final boolean _early;
Eof(boolean eof, boolean consumed, boolean early)
{
_eof = eof;
_consumed = consumed;
_early = early;
}
public boolean isEof()
{
return _eof;
}
public boolean isConsumed()
{
return _consumed;
}
public boolean isEarly()
{
return _early;
}
}
protected static class ContentProducer
{
private final Runnable _rawContentProducer;
// Note: _rawContent can never be null for as long as _transformedContent is not null.
private Content _rawContent;
private Content _transformedContent;
private long _rawContentArrived;
private Interceptor _interceptor;
private boolean _allConsumed;
public ContentProducer(Runnable rawContentProducer)
{
_rawContentProducer = rawContentProducer;
}
void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this);
if (_transformedContent == _rawContent)
_transformedContent = null;
if (_transformedContent != null && !_transformedContent.isEmpty())
_transformedContent.failed(null);
_transformedContent = null;
if (_rawContent != null && !_rawContent.isEmpty())
_rawContent.failed(null);
_rawContent = null;
_rawContentArrived = 0L;
if (_interceptor instanceof Destroyable)
((Destroyable)_interceptor).destroy();
_interceptor = null;
_allConsumed = false;
}
int available()
{
if (_transformedContent != null)
return _transformedContent.remaining();
if (_rawContent == null)
produceRawContent();
produceTransformedContent();
return _transformedContent == null ? 0 : _transformedContent.remaining();
}
long getRawContentArrived()
{
return _rawContentArrived;
}
boolean hasRawContent()
{
return _rawContent != null;
}
Interceptor getInterceptor()
{
return _interceptor;
}
void setInterceptor(Interceptor interceptor)
{
this._interceptor = interceptor;
}
void addInterceptor(Interceptor interceptor)
{
if (_interceptor == null)
_interceptor = interceptor;
else
_interceptor = new ChainedInterceptor(_interceptor, interceptor);
}
void addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);
if (content == null)
throw new AssertionError("Cannot add null content");
if (_allConsumed)
{
content.failed(null);
return;
}
if (_rawContent != null)
throw new AssertionError("Cannot add new content while current one hasn't been processed");
_rawContent = content;
_rawContentArrived += content.remaining();
}
void consumeTransformedContent(Runnable failRawContent)
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeTransformedContent", this);
// start by depleting the current _transformedContent
if (_transformedContent != null)
{
_transformedContent.skip(_transformedContent.remaining());
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
// don't bother transforming content, directly deplete the raw one
consumeRawContent();
// fail whatever other content the producer may have
failRawContent.run();
_allConsumed = true;
}
void consumeRawContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeRawContent", this);
if (_rawContent != null)
{
_rawContent.skip(_rawContent.remaining());
_rawContent.succeeded();
_rawContent = null;
}
}
int read(byte[] b, int off, int len)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read", this);
while (_transformedContent == null)
{
if (_rawContent == null)
{
produceRawContent();
if (_rawContent == null)
return 0;
}
produceTransformedContent();
}
int read = _transformedContent.get(b, off, len);
if (_transformedContent.isEmpty())
produceTransformedContent(); //TODO: this should be something like cleanupTransformedContent() instead
return read;
}
/**
* Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer
* if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)}
* is called.
*/
void produceRawContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceRawContent", this);
_rawContentProducer.run();
}
/**
* Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent},
* or store null in {@code _transformedContent} if there is no content to work with.
* Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}.
*/
private void produceTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceTransformedContent", this);
if (_interceptor == null)
{
// no interceptor set
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
_transformedContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
else
{
// interceptor set
transformContent();
if (_transformedContent == null)
{
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
}
}
/**
* Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor.
* The produced content is guaranteed to either be null or not empty.
*/
private void transformContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} transformContent", this);
if (_rawContent == null)
return;
_transformedContent = _interceptor.readFrom(_rawContent);
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived +
",r=" + _rawContent + ",t=" + _transformedContent + "]";
}
}
}

View File

@ -0,0 +1,236 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import javax.servlet.ReadListener;
import org.eclipse.jetty.util.thread.AutoLock;
public abstract class AbstractLockedHttpInput extends AbstractHttpInput
{
public AbstractLockedHttpInput(HttpChannelState state)
{
super(state);
}
/* HttpInput */
@Override
public void recycle()
{
try (AutoLock lock = _contentLock.lock())
{
super.recycle();
}
}
@Override
public Interceptor getInterceptor()
{
try (AutoLock lock = _contentLock.lock())
{
return super.getInterceptor();
}
}
@Override
public void setInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _contentLock.lock())
{
super.setInterceptor(interceptor);
}
}
@Override
public void addInterceptor(Interceptor interceptor)
{
try (AutoLock lock = _contentLock.lock())
{
super.addInterceptor(interceptor);
}
}
@Override
public void asyncReadProduce()
{
try (AutoLock lock = _contentLock.lock())
{
super.asyncReadProduce();
}
}
@Override
public boolean addContent(Content content)
{
try (AutoLock lock = _contentLock.lock())
{
return super.addContent(content);
}
}
@Override
public boolean hasContent()
{
try (AutoLock lock = _contentLock.lock())
{
return super.hasContent();
}
}
@Override
public long getContentLength()
{
try (AutoLock lock = _contentLock.lock())
{
return super.getContentLength();
}
}
@Override
public boolean earlyEOF()
{
try (AutoLock lock = _contentLock.lock())
{
return super.earlyEOF();
}
}
@Override
public boolean eof()
{
try (AutoLock lock = _contentLock.lock())
{
return super.eof();
}
}
@Override
public boolean consumeAll()
{
try (AutoLock lock = _contentLock.lock())
{
return super.consumeAll();
}
}
@Override
public boolean isError()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isError();
}
}
@Override
public boolean isAsync()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isAsync();
}
}
@Override
public boolean onIdleTimeout(Throwable x)
{
try (AutoLock lock = _contentLock.lock())
{
return super.onIdleTimeout(x);
}
}
@Override
public boolean failed(Throwable x)
{
try (AutoLock lock = _contentLock.lock())
{
return super.failed(x);
}
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isFinished();
}
}
@Override
public boolean isReady()
{
try (AutoLock lock = _contentLock.lock())
{
return super.isReady();
}
}
@Override
public void setReadListener(ReadListener readListener)
{
try (AutoLock lock = _contentLock.lock())
{
super.setReadListener(readListener);
}
}
@Override
public int read() throws IOException
{
try (AutoLock lock = _contentLock.lock())
{
return super.read();
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
try (AutoLock lock = _contentLock.lock())
{
return super.read(b, off, len);
}
}
@Override
public int available()
{
try (AutoLock lock = _contentLock.lock())
{
return super.available();
}
}
/* Runnable */
@Override
public void run()
{
try (AutoLock lock = _contentLock.lock())
{
super.run();
}
}
}

View File

@ -64,7 +64,7 @@ import org.slf4j.LoggerFactory;
* HttpParser.RequestHandler callbacks. The completion of the active phase is signalled by a call to
* HttpTransport.completed().
*/
public class HttpChannel implements Runnable, HttpOutput.Interceptor
public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
{
public static Listener NOOP_LISTENER = new Listener() {};
private static final Logger LOG = LoggerFactory.getLogger(HttpChannel.class);
@ -119,11 +119,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
return _state.isSendError();
}
protected HttpInput newHttpInput(HttpChannelState state)
{
//TODO the HTTP2 impl instantiation should be in a subclass
return new HttpInputOverHTTP2(state);
}
protected abstract HttpInput newHttpInput(HttpChannelState state);
protected HttpOutput newHttpOutput()
{
@ -947,7 +943,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
return null;
}
protected void execute(Runnable task)
public void execute(Runnable task)
{
_executor.execute(task);
}

View File

@ -22,17 +22,19 @@ import java.io.IOException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import javax.servlet.ReadListener;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.AbstractHttpInput.Eof;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.AutoLock;
import static org.eclipse.jetty.server.HttpInputOverHTTP.Eof;
public class HttpInputOverFCGI extends HttpInput
{
private static final Logger LOG = Log.getLogger(HttpInputOverFCGI.class);
@ -47,6 +49,7 @@ public class HttpInputOverFCGI extends HttpInput
private Eof _eof = Eof.NOT_YET;
private Throwable _error;
private ReadListener _readListener;
private long _firstByteTimeStamp = Long.MIN_VALUE;
public HttpInputOverFCGI(HttpChannelState state)
{
@ -67,6 +70,7 @@ public class HttpInputOverFCGI extends HttpInput
_eof = Eof.NOT_YET;
_error = null;
_readListener = null;
_firstByteTimeStamp = Long.MIN_VALUE;
}
}
@ -115,6 +119,12 @@ public class HttpInputOverFCGI extends HttpInput
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
if (_firstByteTimeStamp == Long.MIN_VALUE)
{
_firstByteTimeStamp = System.nanoTime();
if (_firstByteTimeStamp == Long.MIN_VALUE)
_firstByteTimeStamp++;
}
long contentArrived = _contentProducer.addContent(content);
long requestContentLength = _channelState.getHttpChannel().getRequest().getContentLengthLong();
// return false to make the parser go on, true to make it stop
@ -225,8 +235,20 @@ public class HttpInputOverFCGI extends HttpInput
@Override
public boolean onIdleTimeout(Throwable x)
{
//TODO implement me!
return false;
try (AutoLock lock = _contentLock.lock())
{
boolean neverDispatched = _channelState.isIdle();
boolean waitingForContent = !_contentProducer.hasTransformedContent() && !_eof.isEof();
if ((waitingForContent || neverDispatched) && !isError())
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
}
return false;
}
}
@Override
@ -341,6 +363,25 @@ public class HttpInputOverFCGI extends HttpInput
{
try (AutoLock lock = _contentLock.lock())
{
// Calculate minimum request rate for DOS protection
long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate();
if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE)
{
long period = System.nanoTime() - _firstByteTimeStamp;
if (period > 0)
{
long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentProducer.getRawContentArrived() < minimumData)
{
BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,
String.format("Request content data rate < %d B/s", minRequestDataRate));
if (_channelState.isResponseCommitted())
_channelState.getHttpChannel().abort(bad);
throw bad;
}
}
}
while (true)
{
int read = _contentProducer.read(b, off, len);
@ -439,10 +480,10 @@ public class HttpInputOverFCGI extends HttpInput
{
try (AutoLock lock = _contentLock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("running");
if (!_contentProducer.hasRawContent())
{
if (LOG.isDebugEnabled())
LOG.debug("running has no raw content; error: {}, EOF = {}", _error, _eof);
if (_error != null || _eof.isEarly())
{
// TODO is this necessary to add here?
@ -460,6 +501,8 @@ public class HttpInputOverFCGI extends HttpInput
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onAllDataRead", x);
_readListener.onError(x);
}
}
@ -467,12 +510,16 @@ public class HttpInputOverFCGI extends HttpInput
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("running has raw content");
try
{
_readListener.onDataAvailable();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onDataAvailable", x);
_readListener.onError(x);
}
}
@ -500,6 +547,7 @@ public class HttpInputOverFCGI extends HttpInput
if (_currentRawContent != null && !_currentRawContent.isEmpty())
_currentRawContent.failed(null);
_currentRawContent = null;
_rawContentQueue.forEach(c -> c.failed(null));
_rawContentQueue.clear();
_rawContentArrived = 0L;
if (_interceptor instanceof Destroyable)
@ -507,11 +555,15 @@ public class HttpInputOverFCGI extends HttpInput
_interceptor = null;
}
//TODO: factor out similarities with read and hasTransformedContent
int available()
{
if (_transformedContent != null)
return _transformedContent.remaining();
if (_currentRawContent == null)
produceRawContent();
return _currentRawContent == null ? 0 : _currentRawContent.remaining();
produceTransformedContent();
return _transformedContent == null ? 0 : _transformedContent.remaining();
}
long getRawContentArrived()
@ -521,6 +573,8 @@ public class HttpInputOverFCGI extends HttpInput
boolean hasRawContent()
{
if (_currentRawContent == null)
produceRawContent();
return _currentRawContent != null;
}

View File

@ -18,720 +18,37 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.Objects;
import javax.servlet.ReadListener;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
// tests used: RequestTest, PartialRFC2616Test, AsyncRequestReadTest, AsyncIOServletTest, GzipHandlerTest
public class HttpInputOverHTTP extends HttpInput
public class HttpInputOverHTTP extends AbstractHttpInput
{
private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class);
private final byte[] _oneByteBuffer = new byte[1];
private final HttpChannelState _channelState;
private final NotifyingSemaphore _semaphore = new NotifyingSemaphore();
// TODO: think about thread visibility of the below variables
private final ContentProducer _contentProducer;
private Eof _eof = Eof.NOT_YET;
private Throwable _error;
private ReadListener _readListener;
public HttpInputOverHTTP(HttpChannelState state)
{
_channelState = state;
_contentProducer = new ContentProducer(() -> ((HttpConnection)state.getHttpChannel().getEndPoint().getConnection()).parseAndFillForContent());
}
/* HttpInput */
@Override
public void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle");
_contentProducer.recycle();
_eof = Eof.NOT_YET;
_error = null;
_readListener = null;
}
@Override
public Interceptor getInterceptor()
{
return _contentProducer.getInterceptor();
}
@Override
public void setInterceptor(Interceptor interceptor)
{
_contentProducer.setInterceptor(interceptor);
}
@Override
public void addInterceptor(Interceptor interceptor)
{
_contentProducer.addInterceptor(interceptor);
}
@Override
public void asyncReadProduce()
{
if (LOG.isDebugEnabled())
LOG.debug("asyncReadProduce {}", _contentProducer);
_contentProducer.produceRawContent();
super(state);
}
@Override
public boolean addContent(Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("addContent {} {}", content, _contentProducer);
long contentArrived = _contentProducer.addContent(content);
long requestContentLength = _channelState.getHttpChannel().getRequest().getContentLengthLong();
// return false to make the parser go on, true to make it stop
// -> tell the parser to stop adding content, unless we have read everything
boolean stopParsing = requestContentLength == -1 || contentArrived < requestContentLength;
if (isAsync())
_channelState.onContentAdded();
return stopParsing;
super.addContent(content);
return true;
}
@Override
public boolean hasContent()
protected void produceRawContent()
{
return _contentProducer.hasRawContent();
((HttpConnection)_channelState.getHttpChannel().getEndPoint().getConnection()).parseAndFillForContent();
}
@Override
public void unblock()
{
if (LOG.isDebugEnabled())
LOG.debug("signalling blocked thread to wake up");
_semaphore.release();
}
@Override
public long getContentLength()
{
return _contentProducer.getRawContentArrived();
}
@Override
public boolean earlyEOF()
{
if (LOG.isDebugEnabled())
LOG.debug("received early EOF");
_eof = Eof.EARLY_EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
_eof = Eof.EOF;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
@Override
public boolean consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("consume all");
_contentProducer.consumeAll();
if (_eof.isEof())
_eof = Eof.CONSUMED_EOF;
if (isFinished())
return !isError();
_eof = Eof.EARLY_EOF;
return false;
}
@Override
public boolean isError()
{
return _error != null;
}
@Override
public boolean isAsync()
{
return _readListener != null;
}
@Override
public boolean onIdleTimeout(Throwable x)
{
//TODO implement me!
return false;
}
@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);
if (_error != null)
_error.addSuppressed(x);
else
_error = x;
if (isAsync())
return _channelState.onContentAdded();
unblock();
return false;
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed();
if (LOG.isDebugEnabled())
LOG.debug("isFinished? {}", finished);
return finished;
}
@Override
public boolean isReady()
{
// calling _contentProducer.hasTransformedContent() might change the _eof state, so the following test order matters
if (_contentProducer.hasTransformedContent() || _eof.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("isReady? true");
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("isReady? false");
_channelState.onReadUnready();
return false;
}
@Override
public void setReadListener(ReadListener readListener)
{
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
if (LOG.isDebugEnabled())
LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer);
boolean woken;
if (isError())
{
woken = _channelState.onReadReady();
}
else
{
if (_contentProducer.hasTransformedContent())
{
woken = _channelState.onReadReady();
}
else if (_eof.isEof())
{
woken = _channelState.onReadEof();
}
else
{
_channelState.onReadUnready();
woken = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("setReadListener woken=" + woken);
if (woken)
scheduleReadListenerNotification();
}
private void scheduleReadListenerNotification()
{
HttpChannel channel = _channelState.getHttpChannel();
channel.execute(channel);
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IOException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
protected void failRawContent(Throwable failure)
{
while (true)
{
int read = _contentProducer.read(b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s)", read);
if (read > 0)
return read;
if (LOG.isDebugEnabled())
LOG.debug("read error = " + _error);
if (_error != null)
throw new IOException(_error);
if (LOG.isDebugEnabled())
LOG.debug("read EOF = {}", _eof);
if (_eof.isEarly())
throw new EofException("Early EOF");
if (LOG.isDebugEnabled())
LOG.debug("read async = {}", isAsync());
if (!isAsync())
{
if (_eof.isEof())
{
_eof = Eof.CONSUMED_EOF;
if (LOG.isDebugEnabled())
LOG.debug("read on EOF, switching to CONSUMED_EOF and returning");
return -1;
}
if (LOG.isDebugEnabled())
LOG.debug("read blocked");
blockForContent();
if (LOG.isDebugEnabled())
LOG.debug("read unblocked");
}
else
{
if (_eof.isEof())
{
boolean wasInAsyncWait = _channelState.onReadEof();
if (wasInAsyncWait)
scheduleReadListenerNotification();
if (LOG.isDebugEnabled())
LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait);
_eof = Eof.CONSUMED_EOF;
return -1;
}
else
{
//TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead?
_channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested
return 0;
}
}
}
}
@Override
public int available()
{
int available = _contentProducer.available();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
return available;
}
private void blockForContent()
{
try
{
_semaphore.acquire(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("waiting for signal to wake up");
_channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested if it blocks
});
}
catch (Throwable x)
{
_channelState.getHttpChannel().onBlockWaitForContentFailure(x);
}
}
/* Runnable */
/*
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("running");
if (!_contentProducer.hasRawContent())
{
if (_error != null || _eof.isEarly())
{
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
if (_error != null)
_readListener.onError(_error);
else
_readListener.onError(new EofException("Early EOF"));
}
else if (_eof.isEof())
{
try
{
_readListener.onAllDataRead();
}
catch (Throwable x)
{
_readListener.onError(x);
}
}
// else: !hasContent() && !error && !EOF -> no-op
}
else
{
try
{
_readListener.onDataAvailable();
}
catch (Throwable x)
{
_readListener.onError(x);
}
}
}
/**
* An {@link Interceptor} that chains two other {@link Interceptor}s together.
* The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s
* {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned
* to the next {@link Interceptor}.
*/
static class ChainedInterceptor implements Interceptor, Destroyable
{
private final Interceptor _prev;
private final Interceptor _next;
public ChainedInterceptor(Interceptor prev, Interceptor next)
{
_prev = prev;
_next = next;
}
public Interceptor getPrev()
{
return _prev;
}
public Interceptor getNext()
{
return _next;
}
@Override
public Content readFrom(Content content)
{
return getNext().readFrom(getPrev().readFrom(content));
}
@Override
public void destroy()
{
if (_prev instanceof Destroyable)
((Destroyable)_prev).destroy();
if (_next instanceof Destroyable)
((Destroyable)_next).destroy();
}
}
enum Eof
{
NOT_YET(false, false, false),
EOF(true, false, false),
CONSUMED_EOF(true, true, false),
EARLY_EOF(true, false, true),
;
private final boolean _eof;
private final boolean _consumed;
private final boolean _early;
Eof(boolean eof, boolean consumed, boolean early)
{
_eof = eof;
_consumed = consumed;
_early = early;
}
public boolean isEof()
{
return _eof;
}
public boolean isConsumed()
{
return _consumed;
}
public boolean isEarly()
{
return _early;
}
}
private static class ContentProducer
{
private final Runnable _rawContentProducer;
// TODO: think about thread visibility of the below variables
// Note: _rawContent can never be null for as long as _transformedContent is not null.
private HttpInput.Content _rawContent;
private HttpInput.Content _transformedContent;
private long _rawContentArrived;
private HttpInput.Interceptor _interceptor;
public ContentProducer(Runnable rawContentProducer)
{
_rawContentProducer = rawContentProducer;
}
void recycle()
{
if (LOG.isDebugEnabled())
LOG.debug("recycle {}", this);
if (_transformedContent == _rawContent)
_transformedContent = null;
if (_transformedContent != null && !_transformedContent.isEmpty())
_transformedContent.failed(null);
_transformedContent = null;
if (_rawContent != null && !_rawContent.isEmpty())
_rawContent.failed(null);
_rawContent = null;
_rawContentArrived = 0L;
if (_interceptor instanceof Destroyable)
((Destroyable)_interceptor).destroy();
_interceptor = null;
}
int available()
{
if (_rawContent == null)
produceRawContent();
return _rawContent == null ? 0 : _rawContent.remaining();
}
long getRawContentArrived()
{
return _rawContentArrived;
}
boolean hasRawContent()
{
return _rawContent != null;
}
boolean hasTransformedContent()
{
if (_transformedContent != null)
return true;
if (_rawContent == null)
produceRawContent();
produceTransformedContent();
return _transformedContent != null;
}
HttpInput.Interceptor getInterceptor()
{
return _interceptor;
}
void setInterceptor(HttpInput.Interceptor interceptor)
{
this._interceptor = interceptor;
}
void addInterceptor(HttpInput.Interceptor interceptor)
{
if (_interceptor == null)
_interceptor = interceptor;
else
_interceptor = new HttpInputOverHTTP.ChainedInterceptor(_interceptor, interceptor);
}
long addContent(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);
if (content == null)
throw new AssertionError("Cannot add null content");
if (_rawContent != null)
throw new AssertionError("Cannot add new content while current one hasn't been processed");
_rawContent = content;
_rawContentArrived += content.remaining();
return _rawContentArrived;
}
void consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("{} consumeAll", this);
// start by depleting the current _transformedContent
if (_transformedContent != null)
{
_transformedContent.skip(_transformedContent.remaining());
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
// don't bother transforming content, directly deplete the raw one
while (true)
{
if (_rawContent == null)
produceRawContent();
if (_rawContent == null)
break;
_rawContent.skip(_rawContent.remaining());
_rawContent.succeeded();
_rawContent = null;
}
}
int read(byte[] b, int off, int len)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read", this);
while (_transformedContent == null)
{
if (_rawContent == null)
{
produceRawContent();
if (_rawContent == null)
return 0;
}
produceTransformedContent();
}
int read = _transformedContent.get(b, off, len);
if (_transformedContent.isEmpty())
produceTransformedContent();
return read;
}
/**
* Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer
* if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)}
* is called.
*/
void produceRawContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceRawContent", this);
_rawContentProducer.run();
}
/**
* Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent},
* or store null in {@code _transformedContent} if there is no content to work with.
* Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}.
*/
private void produceTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produceTransformedContent", this);
if (_interceptor == null)
{
// no interceptor set
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
_transformedContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
else
{
// interceptor set
transformContent();
if (_transformedContent == null)
{
if (_rawContent != null && _rawContent.isEmpty())
{
_rawContent.succeeded();
_rawContent = null;
}
else
{
_transformedContent = _rawContent;
}
}
}
}
/**
* Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor.
* The produced content is guaranteed to either be null or not empty.
*/
private void transformContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} transformContent", this);
if (_rawContent == null)
return;
_transformedContent = _interceptor.readFrom(_rawContent);
if (_transformedContent != null && _transformedContent.isEmpty())
{
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
_transformedContent = null;
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived +
",r=" + _rawContent + ",t=" + _transformedContent + "]";
}
}
private static class NotifyingSemaphore
{
private int permits;
public synchronized void acquire(Runnable onBlocking) throws InterruptedException
{
if (permits == 0)
onBlocking.run();
while (permits == 0)
wait();
permits--;
}
public synchronized void release()
{
permits++;
if (permits == 1)
notify();
if (!_contentProducer.hasRawContent())
_contentProducer.produceRawContent();
if (!_contentProducer.hasRawContent())
break;
_contentProducer.consumeRawContent();
}
}
}

View File

@ -1,180 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import javax.servlet.ReadListener;
public class HttpInputOverHTTP2 extends HttpInput
{
private final byte[] _oneByteBuffer = new byte[1];
private final HttpChannelState _channelState;
public HttpInputOverHTTP2(HttpChannelState state)
{
_channelState = state;
}
/* HttpInput */
@Override
public void recycle()
{
}
@Override
public Interceptor getInterceptor()
{
return null;
}
@Override
public void setInterceptor(Interceptor interceptor)
{
}
@Override
public void addInterceptor(Interceptor interceptor)
{
}
@Override
public void asyncReadProduce() throws IOException
{
}
@Override
public boolean addContent(Content content)
{
return false;
}
@Override
public boolean hasContent()
{
return false;
}
@Override
public void unblock()
{
}
@Override
public long getContentLength()
{
return 0;
}
@Override
public boolean earlyEOF()
{
return false;
}
@Override
public boolean eof()
{
return false;
}
@Override
public boolean consumeAll()
{
return false;
}
@Override
public boolean isError()
{
return false;
}
@Override
public boolean isAsync()
{
return false;
}
@Override
public boolean onIdleTimeout(Throwable x)
{
return false;
}
@Override
public boolean failed(Throwable x)
{
return false;
}
/* ServletInputStream */
@Override
public boolean isFinished()
{
return false;
}
@Override
public boolean isReady()
{
return false;
}
@Override
public void setReadListener(ReadListener readListener)
{
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
if (read == 0)
throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
return 0;
}
@Override
public int available() throws IOException
{
return 0;
}
/* Runnable */
@Override
public void run()
{
}
}

View File

@ -48,6 +48,12 @@ public class HttpWriterTest
HttpChannel channel = new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null)
{
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return null;
}
@Override
public ByteBufferPool getByteBufferPool()
{

View File

@ -175,7 +175,14 @@ public class ResponseTest
{
_channelError = failure;
}
});
})
{
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
return new HttpInputOverHTTP(state);
}
};
}
@AfterEach