diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java index 38a46350aae..0fa1e5da408 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java @@ -18,7 +18,9 @@ package org.eclipse.jetty.fcgi.server; +import java.util.LinkedList; import java.util.Locale; +import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; @@ -36,7 +38,6 @@ import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpInput; -import org.eclipse.jetty.server.HttpInputOverFCGI; import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; @@ -46,6 +47,8 @@ public class HttpChannelOverFCGI extends HttpChannel { private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class); + private final Queue _contentQueue = new LinkedList<>(); + private boolean _contentFailed; private final HttpFields.Mutable fields = HttpFields.build(); private final Dispatcher dispatcher; private String method; @@ -60,6 +63,46 @@ public class HttpChannelOverFCGI extends HttpChannel this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this); } + void enqueueContent(HttpInput.Content content) + { + synchronized (_contentQueue) + { + if (_contentFailed) + content.failed(null); + else + _contentQueue.offer(content); + } + } + + void pushContent() + { + HttpInput.Content content; + synchronized (_contentQueue) + { + if (_contentFailed) + content = null; + else + content = _contentQueue.poll(); + } + if (content != null) + onContent(content); + } + + void failContent(Throwable failure) + { + synchronized (_contentQueue) + { + _contentFailed = true; + while (true) + { + HttpInput.Content content = _contentQueue.poll(); + if (content == null) + break; + content.failed(failure); + } + } + } + @Override protected HttpInput newHttpInput(HttpChannelState state) { diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpInputOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpInputOverFCGI.java new file mode 100644 index 00000000000..1c662aeb51a --- /dev/null +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpInputOverFCGI.java @@ -0,0 +1,42 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.fcgi.server; + +import org.eclipse.jetty.server.AbstractLockedHttpInput; +import org.eclipse.jetty.server.HttpChannelState; + +public class HttpInputOverFCGI extends AbstractLockedHttpInput +{ + public HttpInputOverFCGI(HttpChannelState state) + { + super(state); + } + + @Override + protected void produceRawContent() + { + ((HttpChannelOverFCGI)_channelState.getHttpChannel()).pushContent(); + } + + @Override + protected void failRawContent(Throwable failure) + { + ((HttpChannelOverFCGI)_channelState.getHttpChannel()).failContent(failure); + } +} diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java index 27b1b9ba465..820dfc3e3e8 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java @@ -197,7 +197,7 @@ public class ServerFCGIConnection extends AbstractConnection { ByteBuffer copy = ByteBuffer.allocate(buffer.remaining()); copy.put(buffer).flip(); - channel.onContent(new HttpInput.Content(copy)); + channel.enqueueContent(new HttpInput.Content(copy)); } return false; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverFCGI.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverFCGI.java deleted file mode 100644 index ad225549ad2..00000000000 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverFCGI.java +++ /dev/null @@ -1,753 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 -// -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.server; - -import java.io.IOException; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import javax.servlet.ReadListener; - -import org.eclipse.jetty.http.BadMessageException; -import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.server.AbstractHttpInput.Eof; -import org.eclipse.jetty.util.component.Destroyable; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.AutoLock; - -public class HttpInputOverFCGI extends HttpInput -{ - private static final Logger LOG = Log.getLogger(HttpInputOverFCGI.class); - - private final byte[] _oneByteBuffer = new byte[1]; - private final HttpChannelState _channelState; - - private final AutoLock _contentLock = new AutoLock(); - private final Condition _contentLockCondition = _contentLock.newCondition(); - - private final ContentProducer _contentProducer; - private Eof _eof = Eof.NOT_YET; - private Throwable _error; - private ReadListener _readListener; - private long _firstByteTimeStamp = Long.MIN_VALUE; - - public HttpInputOverFCGI(HttpChannelState state) - { - _channelState = state; - _contentProducer = new ContentProducer(); - } - - /* HttpInput */ - - @Override - public void recycle() - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("recycle"); - _contentProducer.recycle(); - _eof = Eof.NOT_YET; - _error = null; - _readListener = null; - _firstByteTimeStamp = Long.MIN_VALUE; - } - } - - @Override - public Interceptor getInterceptor() - { - try (AutoLock lock = _contentLock.lock()) - { - return _contentProducer.getInterceptor(); - } - } - - @Override - public void setInterceptor(Interceptor interceptor) - { - try (AutoLock lock = _contentLock.lock()) - { - _contentProducer.setInterceptor(interceptor); - } - } - - @Override - public void addInterceptor(Interceptor interceptor) - { - try (AutoLock lock = _contentLock.lock()) - { - _contentProducer.addInterceptor(interceptor); - } - } - - @Override - public void asyncReadProduce() - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("asyncReadProduce {}", _contentProducer); - _contentProducer.produceRawContent(); - } - } - - @Override - public boolean addContent(Content content) - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("addContent {} {}", content, _contentProducer); - if (_firstByteTimeStamp == Long.MIN_VALUE) - { - _firstByteTimeStamp = System.nanoTime(); - if (_firstByteTimeStamp == Long.MIN_VALUE) - _firstByteTimeStamp++; - } - long contentArrived = _contentProducer.addContent(content); - long requestContentLength = _channelState.getHttpChannel().getRequest().getContentLengthLong(); - // return false to make the parser go on, true to make it stop - // -> tell the parser to stop adding content, unless we have read everything - boolean stopParsing = requestContentLength == -1 || contentArrived < requestContentLength; - if (isAsync()) - _channelState.onContentAdded(); - return stopParsing; - } - } - - @Override - public boolean hasContent() - { - try (AutoLock lock = _contentLock.lock()) - { - return _contentProducer.hasRawContent(); - } - } - - @Override - public void unblock() - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("signalling blocked thread to wake up"); - _contentLockCondition.signal(); - } - } - - @Override - public long getContentLength() - { - try (AutoLock lock = _contentLock.lock()) - { - return _contentProducer.getRawContentArrived(); - } - } - - @Override - public boolean earlyEOF() - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("received early EOF"); - _eof = Eof.EARLY_EOF; - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - return false; - } - } - - @Override - public boolean eof() - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("received EOF"); - _eof = Eof.EOF; - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - return false; - } - } - - @Override - public boolean consumeAll() - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("consume all"); - _contentProducer.consumeAll(); - if (_eof.isEof()) - _eof = Eof.CONSUMED_EOF; - - if (isFinished()) - return !isError(); - - _eof = Eof.EARLY_EOF; - return false; - } - } - - @Override - public boolean isError() - { - try (AutoLock lock = _contentLock.lock()) - { - return _error != null; - } - } - - @Override - public boolean isAsync() - { - try (AutoLock lock = _contentLock.lock()) - { - return _readListener != null; - } - } - - @Override - public boolean onIdleTimeout(Throwable x) - { - try (AutoLock lock = _contentLock.lock()) - { - boolean neverDispatched = _channelState.isIdle(); - boolean waitingForContent = !_contentProducer.hasTransformedContent() && !_eof.isEof(); - if ((waitingForContent || neverDispatched) && !isError()) - { - x.addSuppressed(new Throwable("HttpInput idle timeout")); - _error = x; - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - } - return false; - } - } - - @Override - public boolean failed(Throwable x) - { - try (AutoLock lock = _contentLock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("failed " + x); - if (_error != null) - _error.addSuppressed(x); - else - _error = x; - - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - return false; - } - } - - /* ServletInputStream */ - - @Override - public boolean isFinished() - { - try (AutoLock lock = _contentLock.lock()) - { - boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed(); - if (LOG.isDebugEnabled()) - LOG.debug("isFinished? {}", finished); - return finished; - } - } - - @Override - public boolean isReady() - { - try (AutoLock lock = _contentLock.lock()) - { - // calling _contentProducer.hasTransformedContent() might change the _eof state, so the following test order matters - if (_contentProducer.hasTransformedContent() || _eof.isEof()) - { - if (LOG.isDebugEnabled()) - LOG.debug("isReady? true"); - return true; - } - if (LOG.isDebugEnabled()) - LOG.debug("isReady? false"); - _channelState.onReadUnready(); - return false; - } - } - - @Override - public void setReadListener(ReadListener readListener) - { - try (AutoLock lock = _contentLock.lock()) - { - if (_readListener != null) - throw new IllegalStateException("ReadListener already set"); - _readListener = Objects.requireNonNull(readListener); - - if (LOG.isDebugEnabled()) - LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer); - boolean woken; - if (isError()) - { - woken = _channelState.onReadReady(); - } - else - { - if (_contentProducer.hasTransformedContent()) - { - woken = _channelState.onReadReady(); - } - else if (_eof.isEof()) - { - woken = _channelState.onReadEof(); - } - else - { - _channelState.onReadUnready(); - woken = false; - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("setReadListener woken=" + woken); - if (woken) - scheduleReadListenerNotification(); - } - } - - private void scheduleReadListenerNotification() - { - HttpChannel channel = _channelState.getHttpChannel(); - channel.execute(channel); - } - - @Override - public int read() throws IOException - { - int read = read(_oneByteBuffer, 0, 1); - if (read == 0) - throw new IOException("unready read=0"); - return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException - { - try (AutoLock lock = _contentLock.lock()) - { - // Calculate minimum request rate for DOS protection - long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); - if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE) - { - long period = System.nanoTime() - _firstByteTimeStamp; - if (period > 0) - { - long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); - if (_contentProducer.getRawContentArrived() < minimumData) - { - BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, - String.format("Request content data rate < %d B/s", minRequestDataRate)); - if (_channelState.isResponseCommitted()) - _channelState.getHttpChannel().abort(bad); - throw bad; - } - } - } - - while (true) - { - int read = _contentProducer.read(b, off, len); - if (LOG.isDebugEnabled()) - LOG.debug("read produced {} byte(s)", read); - if (read > 0) - return read; - - if (LOG.isDebugEnabled()) - LOG.debug("read error = " + _error); - if (_error != null) - throw new IOException(_error); - - if (LOG.isDebugEnabled()) - LOG.debug("read EOF = {}", _eof); - if (_eof.isEarly()) - throw new EofException("Early EOF"); - - if (LOG.isDebugEnabled()) - LOG.debug("read async = {}", isAsync()); - if (!isAsync()) - { - if (_eof.isEof()) - { - _eof = Eof.CONSUMED_EOF; - if (LOG.isDebugEnabled()) - LOG.debug("read on EOF, switching to CONSUMED_EOF and returning"); - return -1; - } - if (LOG.isDebugEnabled()) - LOG.debug("read blocked"); - blockForContent(); - if (LOG.isDebugEnabled()) - LOG.debug("read unblocked"); - } - else - { - if (_eof.isEof()) - { - boolean wasInAsyncWait = _channelState.onReadEof(); - if (wasInAsyncWait) - scheduleReadListenerNotification(); - if (LOG.isDebugEnabled()) - LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait); - _eof = Eof.CONSUMED_EOF; - return -1; - } - else - { - //TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead? - _channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested - return 0; - } - } - } - } - } - - @Override - public int available() - { - try (AutoLock lock = _contentLock.lock()) - { - int available = _contentProducer.available(); - if (LOG.isDebugEnabled()) - LOG.debug("available = {}", available); - return available; - } - } - - private void blockForContent() - { - try - { - _channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested - if (LOG.isDebugEnabled()) - LOG.debug("waiting for signal to wake up"); - _contentLockCondition.await(); - if (LOG.isDebugEnabled()) - LOG.debug("signalled to wake up"); - } - catch (Throwable x) - { - _channelState.getHttpChannel().onBlockWaitForContentFailure(x); - } - } - - /* Runnable */ - - /* - *

While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link - * ContextHandler#handle(Runnable)} to setup classloaders etc.

- */ - @Override - public void run() - { - try (AutoLock lock = _contentLock.lock()) - { - if (!_contentProducer.hasRawContent()) - { - if (LOG.isDebugEnabled()) - LOG.debug("running has no raw content; error: {}, EOF = {}", _error, _eof); - if (_error != null || _eof.isEarly()) - { - // TODO is this necessary to add here? - _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); - if (_error != null) - _readListener.onError(_error); - else - _readListener.onError(new EofException("Early EOF")); - } - else if (_eof.isEof()) - { - try - { - _readListener.onAllDataRead(); - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("running failed onAllDataRead", x); - _readListener.onError(x); - } - } - // else: !hasContent() && !error && !EOF -> no-op - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("running has raw content"); - try - { - _readListener.onDataAvailable(); - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("running failed onDataAvailable", x); - _readListener.onError(x); - } - } - } - } - - private static class ContentProducer - { - // Note: _rawContent can never be null for as long as _transformedContent is not null. - private final Queue _rawContentQueue = new LinkedBlockingQueue<>(); - private HttpInput.Content _currentRawContent; - private HttpInput.Content _transformedContent; - private long _rawContentArrived; - private HttpInput.Interceptor _interceptor; - - void recycle() - { - if (LOG.isDebugEnabled()) - LOG.debug("recycle {}", this); - if (_transformedContent == _currentRawContent) - _transformedContent = null; - if (_transformedContent != null && !_transformedContent.isEmpty()) - _transformedContent.failed(null); - _transformedContent = null; - if (_currentRawContent != null && !_currentRawContent.isEmpty()) - _currentRawContent.failed(null); - _currentRawContent = null; - _rawContentQueue.forEach(c -> c.failed(null)); - _rawContentQueue.clear(); - _rawContentArrived = 0L; - if (_interceptor instanceof Destroyable) - ((Destroyable)_interceptor).destroy(); - _interceptor = null; - } - - //TODO: factor out similarities with read and hasTransformedContent - int available() - { - if (_transformedContent != null) - return _transformedContent.remaining(); - if (_currentRawContent == null) - produceRawContent(); - produceTransformedContent(); - return _transformedContent == null ? 0 : _transformedContent.remaining(); - } - - long getRawContentArrived() - { - return _rawContentArrived; - } - - boolean hasRawContent() - { - if (_currentRawContent == null) - produceRawContent(); - return _currentRawContent != null; - } - - boolean hasTransformedContent() - { - if (_transformedContent != null) - return true; - if (_currentRawContent == null) - produceRawContent(); - produceTransformedContent(); - return _transformedContent != null; - } - - HttpInput.Interceptor getInterceptor() - { - return _interceptor; - } - - void setInterceptor(HttpInput.Interceptor interceptor) - { - this._interceptor = interceptor; - } - - void addInterceptor(HttpInput.Interceptor interceptor) - { - if (_interceptor == null) - _interceptor = interceptor; - else - _interceptor = new HttpInputOverHTTP.ChainedInterceptor(_interceptor, interceptor); - } - - long addContent(HttpInput.Content content) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} addContent {}", this, content); - if (content == null) - throw new AssertionError("Cannot add null content"); - - _rawContentQueue.offer(content); - _rawContentArrived += content.remaining(); - - return _rawContentArrived; - } - - void consumeAll() - { - if (LOG.isDebugEnabled()) - LOG.debug("{} consumeAll", this); - // start by depleting the current _transformedContent - if (_transformedContent != null) - { - _transformedContent.skip(_transformedContent.remaining()); - if (_transformedContent != _currentRawContent) - _transformedContent.succeeded(); - _transformedContent = null; - } - - // don't bother transforming content, directly deplete the raw one - while (true) - { - if (_currentRawContent == null) - produceRawContent(); - if (_currentRawContent == null) - break; - - _currentRawContent.skip(_currentRawContent.remaining()); - _currentRawContent.succeeded(); - _currentRawContent = null; - } - } - - int read(byte[] b, int off, int len) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} read", this); - while (_transformedContent == null) - { - if (_currentRawContent == null) - { - produceRawContent(); - if (_currentRawContent == null) - return 0; - } - produceTransformedContent(); - } - - int read = _transformedContent.get(b, off, len); - if (_transformedContent.isEmpty()) - produceTransformedContent(); - - return read; - } - - /** - * Call the parser so that it's going to continue parsing of the request buffer, filling it with the socket's buffer - * if needed until either the request buffer is empty with no bytes left in the socket's buffer or {@link #addContent(Content)} - * is called. - */ - void produceRawContent() - { - if (LOG.isDebugEnabled()) - LOG.debug("{} produceRawContent", this); - _currentRawContent = _rawContentQueue.poll(); - } - - /** - * Read {@code _rawContent} and {@code _transformedContent} to produce the next non-empty content to work with and store it in {@code _transformedContent}, - * or store null in {@code _transformedContent} if there is no content to work with. - * Depleted content gets succeeded and its field nullified, which can happen for both {@code _rawContent} and {@code _transformedContent}. - */ - private void produceTransformedContent() - { - if (LOG.isDebugEnabled()) - LOG.debug("{} produceTransformedContent", this); - if (_interceptor == null) - { - // no interceptor set - if (_currentRawContent != null && _currentRawContent.isEmpty()) - { - _currentRawContent.succeeded(); - _currentRawContent = null; - _transformedContent = null; - } - else - { - _transformedContent = _currentRawContent; - } - } - else - { - // interceptor set - transformContent(); - if (_transformedContent == null) - { - if (_currentRawContent != null && _currentRawContent.isEmpty()) - { - _currentRawContent.succeeded(); - _currentRawContent = null; - } - else - { - _transformedContent = _currentRawContent; - } - } - } - } - - /** - * Read {@code _rawContent} and write {@code _transformedContent} to produce content using the interceptor. - * The produced content is guaranteed to either be null or not empty. - */ - private void transformContent() - { - if (LOG.isDebugEnabled()) - LOG.debug("{} transformContent", this); - if (_currentRawContent == null) - return; - - _transformedContent = _interceptor.readFrom(_currentRawContent); - - if (_transformedContent != null && _transformedContent.isEmpty()) - { - if (_transformedContent != _currentRawContent) - _transformedContent.succeeded(); - _transformedContent = null; - } - } - - @Override - public String toString() - { - return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived + - ",r=" + _currentRawContent + ",t=" + _transformedContent + "]"; - } - } -}